potiuk commented on PR #37855:
URL: https://github.com/apache/airflow/pull/37855#issuecomment-1987142108

   > @potiuk initially I was going to write a check that all pydantic models we 
define can round trip. However, as yet some pydantic models do not have types 
defined (as in DAT.THE_MODEL). I then pulled back and only enable the check for 
pydantic models that have a type, which stored in those private vars.
   
   Well. The thing is (and I explained it earlier to @bolkedebruin ) - no 
serializations are equal. There are various ways serialization can be employed 
and there are cases where for ser-de the "de" is not needed only "ser" is . 
   
   In case of our models, we do not **really** need `de` part It's pretty much, 
useless. And roundtrip is not expected to work. so I'd even say that we could 
(if @bolkedebruin agrees to it) - completely remove  the whole Pydantic think 
from `serde` (or it's predecessor `serliazed_objects.py`) and simply 
straightforwardly use "fromOrm". It's been added here based on earlier 
discussiosn with f Bolke to keep serialization in one place but I think it's a 
bit over-the-board for what we need it for.
   
   So the whole thing here is that we ONLY use Pydantic here, because it allows 
us to automatically serialiize arbitrary SqlAlchemy ORM model to to a non-ORM 
Pydantic 'data-only" counterpart, and the nice thing it does that it will 
automaitcally follow the foreign-key relationship and will bring us all the 
related objects already nicely retrieved from ORM without writing a single line 
of code to do so. And all it needs is to write a Pydantic counter-part to the 
object we want to serialise and sent from the internal API component to 
whatever component needs to to use it.
   
   Say for example DagRunPydantic (I omit irrrelevant fields):
   
   ```
   class DagRunPydantic(BaseModelPydantic):
        ...
       consumed_dataset_events: List[DatasetEventPydantic]  # noqa: UP006
       ....
   ```
   
   How it works when you call `fromOrm` object is that when you have `DagRun`  
ORM object (inside the Internal API component) and want to send it as response 
to the caller (who is say Worker' localTaskJob that wants to place dag_run in 
the context it passes to the task) then you call `fromOrm` method (I think the 
method changed in Pydantic V2) and Pydantic will do all the heavy-lifting, it 
will replace all the ORM fields (ints/floats/strings etc. with their "non-ORM" 
conterparts, and it will also follow all the relations to retrieve linked ORM 
objects and will recursively convert them into Pydantic models. 
   
   So what you pass `DagRun` over pydantic `fromOrm`  as something to send over 
the wire is a DagRunPydantic object with list of DatasetEventPydantic objects - 
all nicely serializable, and all the connected objects retrieved.
   
   And the nicest thing for the receiving end (say `PythonOperator`  execute 
model that will receive the `dag_run` object as `DagRunPydantic` one) - will 
not even realise that what it got is a non-ORM, Pydantic set of objects, it 
will be able to use it for `READ` purposes - in exactly the same way as it did 
before. Say `for event in dag_run.consumed_dataset_events` - will yield 
`DataSetEventPydantic` objects, and the code that did:
   
   ```
   test_extras: list[str] = []
   for event in dag_run.consumed_dataset_events:
       test_extras = event.extras['test']
   ```
   
   will continue working.
   
   And the most important thing here is that we have **absolutely no need** for 
`de` part of `serde` there. Zero. NIL. From the local task perspective, we need 
a read-only copy of all the ORM models. Any time we try to change it directly 
from the local task's `execute` it's a **misuse** . While some of our users 
could have creatively done that - it will stop working when internal API is 
used. And that's not even bug or breaking compatibility. It's the FEATURE of 
internal-api implementation. The whole purpose of Internal-API component is to 
block those kind of `misuses`.  This is the whole reason why we implement 
Internal API - so that DAG author, cannot manipulate the Metadata DB directly - 
they should only do that via small list of internal_api decorated calls that 
are the "allowed" ways of doing it. 
   
   So - `internal_api` decorated call will **never** deserialize the Pydantic 
Model into ORM ones . There is no need, actually it would be harmful. What the 
`internal_api` calls do - they are passing primitives to the internal api 
component and will ask it to do DB operations on it. Say we want to update 
state of the `TaskInstance` - we will never ever send a `TaskInstancePydantic` 
via internal_api call from the `local task job`. Instead we will call 
`update_state(dag_id, task_id, run_id, new_state)` - none of that requires 
deserializing of `TaskInstancePydantic` object we already have (we received it 
earlier via another internal_api call) 
   
   
   So - I'd say what we try to do here (if finally @bolkedebruin agrees with 
it) - is to completely remove the Pydantic branch from serde and just directly 
call serialization methods inside of the internal_api call. I have not managed 
to convinve Bolke before that this should be the case, but maybe we can agree 
to it and simplify (or rather completely remove) serde <> internal api 
relation. I think it's completely not needed to be hones, but for some strange 
reason we insist on moving everything related to serializaiton in `serde` 
(previously `airfow/serialization_objects'( as the only serialization solution 
we have.
   
   I believe it's inevitable, we will have a number of disconnected places 
where serialization is done in a way that better suits the case it's needed (in 
this particular case - we **only** use Pydantic because it speaks SQLALCHEMY 
out-of-the-box and it can convert arbitrary SQLAlchemy object - including its 
relations -  to their counterpart - providing that we have a corresponding 
Pydantic definition of that model.
   
   I think the Pydantic part of serde/serializing DagRuns have been guarded by 
the feature flag already - so we could easily remove it and just completely 
decouple the `airflow serialization <-> internal_api_serialization` - if we 
agree that's the right thing to do (I think so from the very beginning).
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to