VladaZakharova opened a new issue, #60941:
URL: https://github.com/apache/airflow/issues/60941
### Apache Airflow version
3.1.6
### If "Other Airflow 3 version" selected, which one?
3.0.1
### What happened?
The following System Tests are not properly parsed by DAG processor in
Airflow 3.1.0:
- cloud_memorystore_redis
- google_analytics_admin
- vertex_ai_experiment_service_dag
- gcp_vision_annotate_image
When adding system tests to Airflow that has not only primitive type objects
(for example, object of type Instance, or any Enum value inside some object)
as a parameters to the operator, the dag processor does not parse this file at
all. Any other error is also not shown on the UI, nor anything at all regarding
this DAG.
The only exception from the logs of DAG processor that can be caught in the
breeze logs (without actually specifying even the system tests that was not
parsed):
```
dag-processor-manager` Traceback (most recent call last): File
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 387, in _fork_main target() File
"/opt/python3.11/lib/python3.11/site-packages/airflow/dag_processing/processor.py",
line 187, in _parse_file_entrypoint comms_decoder.send(result) File
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/comms.py",
line 190, in send frame_bytes = frame.as_bytes() ^^^^^^^^^^^^^^^^ File
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/comms.py",
line 148, in as_bytes self.req_encoder.encode_into(self, buffer, 4) File
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/comms.py",
line 125, in _msgpack_enc_hook raise NotImplementedError(f"Objects of type
{type(obj)} are not supported") NotImplementedError: Objects of type <enum
'Type'> are not supported
```
The problem seems to be when passing these "complicated" objects that are
also inside 'templated_fields' of the operator.
Looks like dag-processor parses the whole system tests and is trying to
serialize those variables from templated_fields using Jinja templating to then
send this to worker (I am guessing here).
When removing those parameters from templated_fields, the error disappears.
Right after release of Airflow 3.0.0 we saw the same error but when passing
values from worker to trigger and back. The issue was created and decision in
the Community is to adjust code for the current serialization, because adding
custom serializers for every provider will take a lot of time - there bunch of
different types and we can't cover all of them ->
https://github.com/apache/airflow/issues/54781.
New way of serialization was added in Airflow 3.1.0 that throws this error
for dags that contain Enums and other non-serializiable objects:
https://github.com/apache/airflow/pull/51699/files#diff-c076ff9d88eb776c2f874ea4fbb66c9d090e080ed5c9b80c2c36dcb3a51360d3R100
The error is not visible until deeper check of the logs of dag-processor,
because the dag is just excluded from the run and not visible on UI.
As the solution for the tests we have for now we have 2 approaches:
1. add '.value' to the values we want to pass to the operator
2. remove the whole parameter from templated_fields of the called operator,
which is also not the best solution, since in some operators they are actually
really needed for xcom
So the solution here is to discuss with all the providers if possible what
will be the best solution for each of us For now we were able to catch those
error for our tests, but I think it is a common problem for a lot of users.
The idea from Jarek was to introduce (as I understood, please correct me) a
serializer for the whole Airflow for every type of object.
(https://github.com/VladaZakharova/airflow/pull/280#issuecomment-3783657448)
Here I have couple of "what if", because as we saw, there are bunch of Enums
from different packages, like 'proto' or '_messages' and some of them already
have deserializors that can be used. But even if using them, can we add them as
a dependency to the whole Airflow to be able to implement this?
So I think this is opened discussion for ideas 'how to implement
deserializor that will suite everyone's needs'
@potiuk @shahar1 @kaxil
### What you think should happen instead?
One of 3 scenarios that I can image for now:
1. Show on UI the error if the parsing has failed.Like, "The DAG bla-bla was
not parsed due to incorrect serialization. Please recheck your DAG definition
and rerun"
2. Ask people to not add every possible field to templated_fields while
implementing operators, but to add only those that can be easily serialized
3. Fix dag-processor so he can actually serialize those types?
### How to reproduce
Run the system test for memorystore_redis with this operator:
```
failover_instance = CloudMemorystoreFailoverInstanceOperator(
task_id="failover-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
data_protection_mode=FailoverInstanceRequest.DataProtectionMode(
FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS.value # type:
ignore[attr-defined]
),
project_id=PROJECT_ID,
)
```
The dag will not be parsed at all and will not be shown on UI.
### Operating System
ubuntu
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]