sjyangkevin commented on issue #52753:
URL: https://github.com/apache/airflow/issues/52753#issuecomment-3092694495

   Hi @Lee-W , @potiuk , I have a look into it, and seems like there are some 
issues with both pandas and numpy after upgrading numpy from `1.26.4` to 
`2.2.6`. Below is how I set up the breeze environment, please correct me if 
there is anything wrong with the approach.
   
   ### Breeze Setup
   
   First, I installed numpy==2.2.6 and put it into `dist`.
   ```
   pip download numpy==2.2.6 --dest dist --no-deps
   ```
   
   ```
   numpy-2.2.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
   ```
   Then, I ran the following command to start airflow locally.
   ```
   breeze start-airflow --python 3.12 --db-reset --auth-manager FabAuthManager 
--use-distributions-from-dist
   ```
   
   ### Issues with numpy
   
   For **numpy**, after upgrading to `2.2.6`, `qualname(o)`, where `o` is a 
`np.bool_(False)` object, is resolved to `<class 'numpy.bool'>`. However, in 
the numpy serializer, the qualified name we are checking is `numpy.bool_`. In 
numpy 1.26.4, `qualname(o)` is resolved to `<class 'numpy.bool_'>`. However, I 
think to reconstruct the object in deserializer, we still need to invoke 
`numpy.bool_`. In my mind, I think we need to update this logic to something 
like below.
   
   ```
   serializers = [
       "numpy.int8",
       "numpy.int16",
       "numpy.int32",
       "numpy.int64",
       "numpy.uint8",
       "numpy.uint16",
       "numpy.uint32",
       "numpy.uint64",
       "numpy.float64",
       "numpy.float16",
       "numpy.complex128",
       "numpy.complex64",
   ]
   
   If numpy major version == 1:
       serializers += ["numpy.bool_"]
   elif numpy major version == 2:
       serializers += ["numpy.bool"]
   
   def deserialize(...):
       # check if it is a numpy bool, if so use numpy.bool_(data) to deserialize
   ```
   
   
https://github.com/apache/airflow/blob/11f512127917b781bba4d5de7dc336f9ffb56e9d/airflow-core/src/airflow/serialization/serializers/numpy.py#L34
   
   <img width="1437" height="687" alt="Image" 
src="https://github.com/user-attachments/assets/e42d6058-dad0-4eba-93b9-511cb5753d9f";
 />
   
   ### Issues with Pandas
   
   For Pandas, I got the following issue. Not sure if you have some context 
what this issue is about.
   
   <img width="1437" height="687" alt="Image" 
src="https://github.com/user-attachments/assets/53243982-b569-4769-9c11-39d287bdb3a0";
 />
   
   ### Running the DAG with `numpy==1.26.4`
   
   ```
   breeze start-airflow --python 3.12 --db-reset --auth-manager FabAuthManager
   ```
   
   <img width="1437" height="687" alt="Image" 
src="https://github.com/user-attachments/assets/11404dff-bd29-4738-8a39-219449bfa645";
 />
   
   ### DAG code
   
   ```python
   from airflow.decorators import dag, task
   from pendulum import datetime
   
   
   @dag(
       start_date=datetime(2025, 5, 23),
       schedule=None,
       catchup=False,
       tags=["serialization", "pydantic", "airflow"],
   )
   def pydantic_serde():
   
       # 1. Pandas DataFrame
       @task
       def get_pandas():
           import pandas as pd
           import numpy as np
           df = pd.DataFrame(np.random.randn(3, 2), columns=list("AB"))
           return df
   
       @task
       def print_pandas(df):
           print("Pandas DataFrame:", df)
   
       # 2. Decimal
       @task
       def get_bignum():
           from decimal import Decimal
           return Decimal(1) / Decimal(7)
   
       @task
       def print_bignum(n):
           print("Decimal:", n, type(n))
   
       # 3. Built-in collections
       @task
       def get_all_builtins():
           return {
               "list": [1, 2, 3],
               "set": {4, 5},
               "tuple": (6, 7),
               "frozenset": frozenset([8, 9])
           }
   
       @task
       def print_all_builtins(obj):
           print("Built-in Types:")
           for k, v in obj.items():
               print(f"{k}: {v} ({type(v)})")
   
       # 4. NumPy scalar types - integers
       @task
       def get_numpy_ints():
           import numpy as np
           return {
               "int8": np.int8(8),
               "int16": np.int16(16),
               "int32": np.int32(32),
               "int64": np.int64(64),
               "uint8": np.uint8(8),
               "uint16": np.uint16(16),
               "uint32": np.uint32(32),
               "uint64": np.uint64(64),
           }
   
       @task
       def print_numpy_ints(obj):
           print("NumPy Integers:")
           for k, v in obj.items():
               print(f"{k}: {v} ({type(v)})")
   
       # 5. NumPy scalar types - misc
       @task
       def get_numpy_misc():
           import numpy as np
           print(type(np.bool_(False)))
           print(np.bool_.__name__)
           return {
               "bool_": np.bool_(False),
               "float16": np.float16(0.125),
               "float64": np.float64(3.14159),
               "complex64": np.complex64(1 + 2j),
               "complex128": np.complex128(3 + 4j),
           }
   
       @task
       def print_numpy_misc(obj):
           print("NumPy Misc Types:")
           for k, v in obj.items():
               print(f"{k}: {v} ({type(v)})")
   
       # 6. Python datetime types
       @task
       def get_python_datetime_types():
           import datetime
           return {
               "date": datetime.date(2025, 6, 29),
               "datetime": datetime.datetime(2025, 6, 29, 12, 34, 56),
               "timedelta": datetime.timedelta(days=1, seconds=3600)
           }
   
       @task
       def print_python_datetime_types(obj):
           print("Python datetime types:")
           for k, v in obj.items():
               print(f"{k}: {v} ({type(v)})")
   
       # 7. Pendulum datetime
       @task
       def get_pendulum_datetime_type():
           import pendulum
           dt = pendulum.datetime(2025, 6, 29, 12, 34, 56, tz="Europe/Paris")
           return dt
   
       @task
       def print_pendulum_datetime_type(dt):
           print("Pendulum DateTime:", dt, type(dt))
   
       # 8. Timezone-aware datetime (ZoneInfo)
       @task
       def get_timezone_aware():
           from zoneinfo import ZoneInfo
           from datetime import datetime as dt
           return dt(2025, 6, 29, 12, 0, tzinfo=ZoneInfo("America/New_York"))
   
       @task
       def print_timezone_aware(tz_dt):
           print("Timezone-aware datetime:", tz_dt, type(tz_dt))
   
       # 9. Pendulum timezone object
       @task
       def get_pendulum_tz():
           import pendulum
           return pendulum.timezone("Asia/Tokyo")
   
       @task
       def print_pendulum_tz(tz):
           print("Pendulum timezone:", tz, type(tz))
   
       # 10. ZoneInfo timezone object
       @task
       def get_zoneinfo_tz():
           from zoneinfo import ZoneInfo
           return ZoneInfo("America/Toronto")
       
       @task
       def print_zoneinfo_tz(tz):
           print("ZoneInfo timezone:", tz, type(tz))
   
       # 11. Cohere embeddings (Pydantic model)
       @task
       def get_embeddings():
           import pydantic
           from airflow.providers.cohere.hooks.cohere import CohereHook
   
           hook = CohereHook()
           embeddings = hook.create_embeddings(["gruyere"])
   
           print("Cohere embeddings type:", type(embeddings))
           print("Is Pydantic model?", isinstance(embeddings, 
pydantic.BaseModel))
           return embeddings
   
       @task
       def print_embeddings(obj):
           print("Cohere Embeddings (Pydantic Model):", obj)
   
       # DAG chaining
       print_pandas(get_pandas())
       print_bignum(get_bignum())
       print_all_builtins(get_all_builtins())
       print_numpy_ints(get_numpy_ints())
       print_numpy_misc(get_numpy_misc())
       print_python_datetime_types(get_python_datetime_types())
       print_pendulum_datetime_type(get_pendulum_datetime_type())
       print_timezone_aware(get_timezone_aware())
       print_pendulum_tz(get_pendulum_tz())
       print_zoneinfo_tz(get_zoneinfo_tz())
       print_embeddings(get_embeddings())
   
   pydantic_serde()
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to