sjyangkevin commented on PR #52360:
URL: https://github.com/apache/airflow/pull/52360#issuecomment-3017910224

   Hi @bolkedebruin , @potiuk , @amoghrajesh , I would appreciate if I could 
get your help again to review this PR. I also wanted to share some findings 
while doing testing.
   
   ### Issue with serializing `numpy.bool`
   
   It looks like the `serialize` method in the `numpy.py` module has issue with 
serializing `numpy.bool` objects. I think it's because the qualified name 
(i.e., `numpy.bool`) is not matched to the ones set in the list. I attempted to 
add an entry `numpy.bool` into the `serializers` list, and it could solve the 
issue. However, it failed the test 
`airflow-core/tests/unit/serialization/test_serde.py::TestSerDe::test_serializers_importable_and_str`
 and details can be found here: 
https://github.com/apache/airflow/actions/runs/15964146039/job/45021686182.
   
   ```python
   serializers = [
       "numpy.int8",
       "numpy.int16",
       "numpy.int32",
       "numpy.int64",
       "numpy.uint8",
       "numpy.uint16",
       "numpy.uint32",
       "numpy.uint64",
       "numpy.bool_",
       "numpy.float64",
       "numpy.float16",
       "numpy.complex128",
       "numpy.complex64",
   ]
   ```
   ![Screenshot from 2025-06-30 
01-31-05](https://github.com/user-attachments/assets/43acdd6b-4763-49d7-8ce3-81cfab8d07ab)
   
   ![Screenshot from 2025-06-30 
00-29-47](https://github.com/user-attachments/assets/cad71741-e6db-4ce2-bde4-896de1c9be62)
   
   ### Issue with an unit test case
   I also found a failed test case when I ran pytest in the breeze environment 
on the `airflow-core/tests/unit/serialization`. It looks like the DAG is 
missing, and not sure if we need some clean up for this, and I didn't see this 
failure when running `breeze testing core-tests --test-type Serialization`.
   
   ![Screenshot from 2025-06-30 
01-27-00](https://github.com/user-attachments/assets/eee909a8-10f7-4191-b501-f037c199783b)
   
   
   ### Test DAG code
   I also updated the DAG code to test for as many objects as possible in 
serialize/deserialize, except for iceberg and deltalake.
   
   ```python
   from airflow.decorators import dag, task
   from pendulum import datetime
   
   @dag(
       start_date=datetime(2025, 5, 23),
       schedule=None,
       catchup=False,
       tags=["serialization", "pydantic", "airflow"],
   )
   def pydantic_serde():
   
       # 1. Pandas DataFrame
       @task
       def get_pandas():
           import pandas as pd
           import numpy as np
           df = pd.DataFrame(np.random.randn(3, 2), columns=list("AB"))
           return df
   
       @task
       def print_pandas(df):
           print("Pandas DataFrame:", df)
   
       # 2. Decimal
       @task
       def get_bignum():
           from decimal import Decimal
           return Decimal(1) / Decimal(7)
   
       @task
       def print_bignum(n):
           print("Decimal:", n, type(n))
   
       # 3. Built-in collections
       @task
       def get_all_builtins():
           return {
               "list": [1, 2, 3],
               "set": {4, 5},
               "tuple": (6, 7),
               "frozenset": frozenset([8, 9])
           }
   
       @task
       def print_all_builtins(obj):
           print("Built-in Types:")
           for k, v in obj.items():
               print(f"{k}: {v} ({type(v)})")
   
       # 4. NumPy scalar types - integers
       @task
       def get_numpy_ints():
           import numpy as np
           return {
               "int8": np.int8(8),
               "int16": np.int16(16),
               "int32": np.int32(32),
               "int64": np.int64(64),
               "uint8": np.uint8(8),
               "uint16": np.uint16(16),
               "uint32": np.uint32(32),
               "uint64": np.uint64(64),
           }
   
       @task
       def print_numpy_ints(obj):
           print("NumPy Integers:")
           for k, v in obj.items():
               print(f"{k}: {v} ({type(v)})")
   
       # 5. NumPy scalar types - misc
       @task
       def get_numpy_misc():
           import numpy as np
           return {
               "bool_": np.bool_(0),
               "float16": np.float16(0.125),
               "float64": np.float64(3.14159),
               "complex64": np.complex64(1 + 2j),
               "complex128": np.complex128(3 + 4j),
           }
   
       @task
       def print_numpy_misc(obj):
           print("NumPy Misc Types:")
           for k, v in obj.items():
               print(f"{k}: {v} ({type(v)})")
   
       # 6. Python datetime types
       @task
       def get_python_datetime_types():
           import datetime
           return {
               "date": datetime.date(2025, 6, 29),
               "datetime": datetime.datetime(2025, 6, 29, 12, 34, 56),
               "timedelta": datetime.timedelta(days=1, seconds=3600)
           }
   
       @task
       def print_python_datetime_types(obj):
           print("Python datetime types:")
           for k, v in obj.items():
               print(f"{k}: {v} ({type(v)})")
   
       # 7. Pendulum datetime
       @task
       def get_pendulum_datetime_type():
           import pendulum
           dt = pendulum.datetime(2025, 6, 29, 12, 34, 56, tz="Europe/Paris")
           return dt
   
       @task
       def print_pendulum_datetime_type(dt):
           print("Pendulum DateTime:", dt, type(dt))
   
       # 8. Timezone-aware datetime (ZoneInfo)
       @task
       def get_timezone_aware():
           from zoneinfo import ZoneInfo
           from datetime import datetime as dt
           return dt(2025, 6, 29, 12, 0, tzinfo=ZoneInfo("America/New_York"))
   
       @task
       def print_timezone_aware(tz_dt):
           print("Timezone-aware datetime:", tz_dt, type(tz_dt))
   
       # 9. Pendulum timezone object
       @task
       def get_pendulum_tz():
           import pendulum
           return pendulum.timezone("Asia/Tokyo")
   
       @task
       def print_pendulum_tz(tz):
           print("Pendulum timezone:", tz, type(tz))
   
       # 10. ZoneInfo timezone object
       @task
       def get_zoneinfo_tz():
           from zoneinfo import ZoneInfo
           return ZoneInfo("America/Toronto")
       
       @task
       def print_zoneinfo_tz(tz):
           print("ZoneInfo timezone:", tz, type(tz))
   
       # 11. Cohere embeddings (Pydantic model)
       @task
       def get_embeddings():
           import pydantic
           from airflow.providers.cohere.hooks.cohere import CohereHook
   
           hook = CohereHook()
           embeddings = hook.create_embeddings(["gruyere"])
   
           print("Cohere embeddings type:", type(embeddings))
           print("Is Pydantic model?", isinstance(embeddings, 
pydantic.BaseModel))
           return embeddings
   
       @task
       def print_embeddings(obj):
           print("Cohere Embeddings (Pydantic Model):", obj)
   
       # DAG chaining
       print_pandas(get_pandas())
       print_bignum(get_bignum())
       print_all_builtins(get_all_builtins())
       print_numpy_ints(get_numpy_ints())
       print_numpy_misc(get_numpy_misc())
       print_python_datetime_types(get_python_datetime_types())
       print_pendulum_datetime_type(get_pendulum_datetime_type())
       print_timezone_aware(get_timezone_aware())
       print_pendulum_tz(get_pendulum_tz())
       print_zoneinfo_tz(get_zoneinfo_tz())
       print_embeddings(get_embeddings())
   
   pydantic_serde()
   ```
   
   ### DAG Test Results
   I modified the Cohere provider and let it return a Pydantic class in the 
breeze environment. 1.) When the Pydantic model is in the whitelist. All the 
tests defined in the DAG passed. 2.) When the Pydantic model is removed from 
the whitelist, the `print_embeddings` task failed due to `ImportError`.
   
   To whitelist the Pydantic model. Add the following to 
`files/airflow-breeze-config/environment_variables.env`
   ```txt
   
AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES=cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings
   ```
   
   ![Screenshot from 2025-06-30 
00-12-07](https://github.com/user-attachments/assets/48f7ae3e-be43-4895-96d0-21a9a6bc9428)
   The **numpy** check passed because I added `numpy.bool` into the 
`serializers` list, but this change is not added to this PR because it failed 
the checks.
   
   ![Screenshot from 2025-06-30 
00-14-53](https://github.com/user-attachments/assets/e1304f42-d9d3-40e3-bc8c-9582ad542818)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to