molcay commented on issue #60941:
URL: https://github.com/apache/airflow/issues/60941#issuecomment-3789752053

   Hi,
   
   To be able to give some extensibility to the providers for encoding for 
provider related types (enums, dataclass, etc);
   - Firstly, we need to introduce a new field for `provider.yaml` files 
(similar approach like `secret-backends`. Lets call it `encoder-backends` (this 
just an example, I am open to naming suggestions).
   - This field will hold a list of `EncoderBackend` implementations which can 
be placed in the provider source code.
   - The `EncoderBackend` implementations need to **extend** 
`BaseEncoderBackend` class (defined below) and **override** the `encode` method.
   - The `encode` method needs to have the following structure:
   ```python
   def encode(self, obj: Any) -> tuple[bool, Any]
       # TODO: provider related encoding goes here
   
       return False, None
   ```
   > as default case, `return False, None` from the `encode` method means there 
were no matches for the given `EncoderBackend` implementation (instead of 
raising a `NotImplementedError` instance).
   
   ### From Airflow Core / Task SDK side
   - The `BaseEncoderBackend` class and the `DefaultEncoderBackend` class can 
be define as the following and replaced with [this section of the 
code](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/comms.py#L113-L132):
   ```python
   class BaseEncoderBackend:
       def encode_hook(self, obj: Any) -> Any:
           encoders: list[BaseEncoderBackend] = 
[*ProvidersManager().get_encoder_backends(), DefaultEncoderBackend]
           encoded_value = None
           for encoder in encoders:
               try:
                   is_encoded, value = encoder().encode(obj)
                   if is_encoded:
                       encoded_value = value
                       break
               except NotImplementedError:  # in case of an implementation 
class does not define encode method or raise NotImplementedError
                   continue
           
           if encoded_value is not None:
               return encoded_value
           
           # Raise a NotImplementedError for other types
           raise NotImplementedError(f"Objects of type {type(obj)} are not 
supported")
   
       def encode(self, obj: Any) -> tuple[bool, Any]:
           """
           A method to encode/serialized the given object.
           The overrides of this method need to return whether the encode 
happened alongside with the encoded value.
           The overrides of this method need to return (False, None) as a 
default return statement
           """
           raise NotImplementedError()
   
   
   class DefaultEncoderBackend(BaseEncoderBackend):
       """The default implementation to fallback if no provider specific 
encoder backend defined"""
       def encode(self, obj: Any) -> tuple[bool, Any]:
           import pendulum
   
           if isinstance(obj, pendulum.DateTime):
               # convert the pendulm Datetime subclass into a raw datetime so 
that msgspec can use it's native
               # encoding
               return True, datetime(
                   obj.year, obj.month, obj.day, obj.hour, obj.minute, 
obj.second, obj.microsecond, tzinfo=obj.tzinfo
               )
           if isinstance(obj, Path):
               return True, str(obj)
           if isinstance(obj, BaseModel):
               return True, obj.model_dump(exclude_unset=True)
   
           return False, None
   
   
   def _new_encoder() -> msgspec.msgpack.Encoder:
       return msgspec.msgpack.Encoder(enc_hook=BaseEncoderBackend().encode_hook)
   ```
   
   - As you can see, `BaseEncoderBackend` is getting this `encoder-backends` 
config from 
[`ProvidersManager`](https://github.com/apache/airflow/blob/9ce2200d04ca16541a8189e65c8bf7e79e354eaa/airflow-core/src/airflow/providers_manager.py#L368)
 and retrieve all defined providers' `encoder-backends`. After that we will 
loop through each encoder and try to encode the value, if no one achieves it we 
fallback to the `DefaultEncoderBackend` and if it will not encode as well, then 
we will see the `NotImplementedError` in the logs.
   > I am not including the implementation details for ProvidersManager for 
sake of simplicity and also it will be a very similar implementation like 
[this](https://github.com/apache/airflow/blob/9ce2200d04ca16541a8189e65c8bf7e79e354eaa/airflow-core/src/airflow/providers_manager.py#L562-L565)
   
   ### From Provider Perspective
   - Here is an sample implementation for a provider:
   ```python
   # providers/google/src/airflow/providers/google/encoder_backends.py
   class GoogleEncoderBackend(BaseEncoderBackend):
       def encode(self, obj: Any)-> tuple[bool, Any]:
           import proto
   
           if isinstance(obj, proto.Enum):  # encode the expected proto.Enum 
type
               return True, obj.value
   
           return False, None
   ```
   - Also, add the definition to the `provider.yaml` file;
   ```yaml
   # providers/google/provider.yaml
   ...
   encoder-backends:
     - airflow.providers.google.encoder_backends.GoogleEncoderBackend
   ```
   
   With this kind of approach, we will have the power in the providers to 
handle their specific types encoding (like `proto.Enum` for the google 
provider). 
   
   I can image one downside of this; if the same type handled by 2 different 
Encoder Backend which encode the value differently; what would be the order?
   
   I also wanted to say that I am not very well aware of the general logic for 
how different components of airflow initialize or work. My approach can be too 
fuzzy or not accurate, I am open for discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to