molcay commented on issue #60941:
URL: https://github.com/apache/airflow/issues/60941#issuecomment-3789752053
Hi,
To be able to give some extensibility to the providers for encoding for
provider related types (enums, dataclass, etc);
- Firstly, we need to introduce a new field for `provider.yaml` files
(similar approach like `secret-backends`. Lets call it `encoder-backends` (this
just an example, I am open to naming suggestions).
- This field will hold a list of `EncoderBackend` implementations which can
be placed in the provider source code.
- The `EncoderBackend` implementations need to **extend**
`BaseEncoderBackend` class (defined below) and **override** the `encode` method.
- The `encode` method needs to have the following structure:
```python
def encode(self, obj: Any) -> tuple[bool, Any]
# TODO: provider related encoding goes here
return False, None
```
> as default case, `return False, None` from the `encode` method means there
were no matches for the given `EncoderBackend` implementation (instead of
raising a `NotImplementedError` instance).
### From Airflow Core / Task SDK side
- The `BaseEncoderBackend` class and the `DefaultEncoderBackend` class can
be define as the following and replaced with [this section of the
code](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/comms.py#L113-L132):
```python
class BaseEncoderBackend:
def encode_hook(self, obj: Any) -> Any:
encoders: list[BaseEncoderBackend] =
[*ProvidersManager().get_encoder_backends(), DefaultEncoderBackend]
encoded_value = None
for encoder in encoders:
try:
is_encoded, value = encoder().encode(obj)
if is_encoded:
encoded_value = value
break
except NotImplementedError: # in case of an implementation
class does not define encode method or raise NotImplementedError
continue
if encoded_value is not None:
return encoded_value
# Raise a NotImplementedError for other types
raise NotImplementedError(f"Objects of type {type(obj)} are not
supported")
def encode(self, obj: Any) -> tuple[bool, Any]:
"""
A method to encode/serialized the given object.
The overrides of this method need to return whether the encode
happened alongside with the encoded value.
The overrides of this method need to return (False, None) as a
default return statement
"""
raise NotImplementedError()
class DefaultEncoderBackend(BaseEncoderBackend):
"""The default implementation to fallback if no provider specific
encoder backend defined"""
def encode(self, obj: Any) -> tuple[bool, Any]:
import pendulum
if isinstance(obj, pendulum.DateTime):
# convert the pendulm Datetime subclass into a raw datetime so
that msgspec can use it's native
# encoding
return True, datetime(
obj.year, obj.month, obj.day, obj.hour, obj.minute,
obj.second, obj.microsecond, tzinfo=obj.tzinfo
)
if isinstance(obj, Path):
return True, str(obj)
if isinstance(obj, BaseModel):
return True, obj.model_dump(exclude_unset=True)
return False, None
def _new_encoder() -> msgspec.msgpack.Encoder:
return msgspec.msgpack.Encoder(enc_hook=BaseEncoderBackend().encode_hook)
```
- As you can see, `BaseEncoderBackend` is getting this `encoder-backends`
config from
[`ProvidersManager`](https://github.com/apache/airflow/blob/9ce2200d04ca16541a8189e65c8bf7e79e354eaa/airflow-core/src/airflow/providers_manager.py#L368)
and retrieve all defined providers' `encoder-backends`. After that we will
loop through each encoder and try to encode the value, if no one achieves it we
fallback to the `DefaultEncoderBackend` and if it will not encode as well, then
we will see the `NotImplementedError` in the logs.
> I am not including the implementation details for ProvidersManager for
sake of simplicity and also it will be a very similar implementation like
[this](https://github.com/apache/airflow/blob/9ce2200d04ca16541a8189e65c8bf7e79e354eaa/airflow-core/src/airflow/providers_manager.py#L562-L565)
### From Provider Perspective
- Here is an sample implementation for a provider:
```python
# providers/google/src/airflow/providers/google/encoder_backends.py
class GoogleEncoderBackend(BaseEncoderBackend):
def encode(self, obj: Any)-> tuple[bool, Any]:
import proto
if isinstance(obj, proto.Enum): # encode the expected proto.Enum
type
return True, obj.value
return False, None
```
- Also, add the definition to the `provider.yaml` file;
```yaml
# providers/google/provider.yaml
...
encoder-backends:
- airflow.providers.google.encoder_backends.GoogleEncoderBackend
```
With this kind of approach, we will have the power in the providers to
handle their specific types encoding (like `proto.Enum` for the google
provider).
I can image one downside of this; if the same type handled by 2 different
Encoder Backend which encode the value differently; what would be the order?
I also wanted to say that I am not very well aware of the general logic for
how different components of airflow initialize or work. My approach can be too
fuzzy or not accurate, I am open for discussion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]