sjyangkevin commented on issue #52753: URL: https://github.com/apache/airflow/issues/52753#issuecomment-3092694495
Hi @Lee-W , @potiuk , I have a look into it, and seems like there are some issues with both pandas and numpy after upgrading numpy from `1.26.4` to `2.2.6`. Below is how I set up the breeze environment, please correct me if there is anything wrong with the approach. ### Breeze Setup First, I installed numpy==2.2.6 and put it into `dist`. ``` pip download numpy==2.2.6 --dest dist --no-deps ``` ``` numpy-2.2.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl ``` Then, I ran the following command to start airflow locally. ``` breeze start-airflow --python 3.12 --db-reset --auth-manager FabAuthManager --use-distributions-from-dist ``` ### Issues with numpy For **numpy**, after upgrading to `2.2.6`, `qualname(o)`, where `o` is a `np.bool_(False)` object, is resolved to `<class 'numpy.bool'>`. However, in the numpy serializer, the qualified name we are checking is `numpy.bool_`. In numpy 1.26.4, `qualname(o)` is resolved to `<class 'numpy.bool_'>`. However, I think to reconstruct the object in deserializer, we still need to invoke `numpy.bool_`. In my mind, I think we need to update this logic to something like below. ``` serializers = [ "numpy.int8", "numpy.int16", "numpy.int32", "numpy.int64", "numpy.uint8", "numpy.uint16", "numpy.uint32", "numpy.uint64", "numpy.float64", "numpy.float16", "numpy.complex128", "numpy.complex64", ] If numpy major version == 1: serializers += ["numpy.bool_"] elif numpy major version == 2: serializers += ["numpy.bool"] def deserialize(...): # check if it is a numpy bool, if so use numpy.bool_(data) to deserialize ``` https://github.com/apache/airflow/blob/11f512127917b781bba4d5de7dc336f9ffb56e9d/airflow-core/src/airflow/serialization/serializers/numpy.py#L34 <img width="1437" height="687" alt="Image" src="https://github.com/user-attachments/assets/e42d6058-dad0-4eba-93b9-511cb5753d9f" /> ### Issues with Pandas For Pandas, I got the following issue. Not sure if you have some context what this issue is about. <img width="1437" height="687" alt="Image" src="https://github.com/user-attachments/assets/53243982-b569-4769-9c11-39d287bdb3a0" /> ### Running the DAG with `numpy==1.26.4` ``` breeze start-airflow --python 3.12 --db-reset --auth-manager FabAuthManager ``` <img width="1437" height="687" alt="Image" src="https://github.com/user-attachments/assets/11404dff-bd29-4738-8a39-219449bfa645" /> ### DAG code ```python from airflow.decorators import dag, task from pendulum import datetime @dag( start_date=datetime(2025, 5, 23), schedule=None, catchup=False, tags=["serialization", "pydantic", "airflow"], ) def pydantic_serde(): # 1. Pandas DataFrame @task def get_pandas(): import pandas as pd import numpy as np df = pd.DataFrame(np.random.randn(3, 2), columns=list("AB")) return df @task def print_pandas(df): print("Pandas DataFrame:", df) # 2. Decimal @task def get_bignum(): from decimal import Decimal return Decimal(1) / Decimal(7) @task def print_bignum(n): print("Decimal:", n, type(n)) # 3. Built-in collections @task def get_all_builtins(): return { "list": [1, 2, 3], "set": {4, 5}, "tuple": (6, 7), "frozenset": frozenset([8, 9]) } @task def print_all_builtins(obj): print("Built-in Types:") for k, v in obj.items(): print(f"{k}: {v} ({type(v)})") # 4. NumPy scalar types - integers @task def get_numpy_ints(): import numpy as np return { "int8": np.int8(8), "int16": np.int16(16), "int32": np.int32(32), "int64": np.int64(64), "uint8": np.uint8(8), "uint16": np.uint16(16), "uint32": np.uint32(32), "uint64": np.uint64(64), } @task def print_numpy_ints(obj): print("NumPy Integers:") for k, v in obj.items(): print(f"{k}: {v} ({type(v)})") # 5. NumPy scalar types - misc @task def get_numpy_misc(): import numpy as np print(type(np.bool_(False))) print(np.bool_.__name__) return { "bool_": np.bool_(False), "float16": np.float16(0.125), "float64": np.float64(3.14159), "complex64": np.complex64(1 + 2j), "complex128": np.complex128(3 + 4j), } @task def print_numpy_misc(obj): print("NumPy Misc Types:") for k, v in obj.items(): print(f"{k}: {v} ({type(v)})") # 6. Python datetime types @task def get_python_datetime_types(): import datetime return { "date": datetime.date(2025, 6, 29), "datetime": datetime.datetime(2025, 6, 29, 12, 34, 56), "timedelta": datetime.timedelta(days=1, seconds=3600) } @task def print_python_datetime_types(obj): print("Python datetime types:") for k, v in obj.items(): print(f"{k}: {v} ({type(v)})") # 7. Pendulum datetime @task def get_pendulum_datetime_type(): import pendulum dt = pendulum.datetime(2025, 6, 29, 12, 34, 56, tz="Europe/Paris") return dt @task def print_pendulum_datetime_type(dt): print("Pendulum DateTime:", dt, type(dt)) # 8. Timezone-aware datetime (ZoneInfo) @task def get_timezone_aware(): from zoneinfo import ZoneInfo from datetime import datetime as dt return dt(2025, 6, 29, 12, 0, tzinfo=ZoneInfo("America/New_York")) @task def print_timezone_aware(tz_dt): print("Timezone-aware datetime:", tz_dt, type(tz_dt)) # 9. Pendulum timezone object @task def get_pendulum_tz(): import pendulum return pendulum.timezone("Asia/Tokyo") @task def print_pendulum_tz(tz): print("Pendulum timezone:", tz, type(tz)) # 10. ZoneInfo timezone object @task def get_zoneinfo_tz(): from zoneinfo import ZoneInfo return ZoneInfo("America/Toronto") @task def print_zoneinfo_tz(tz): print("ZoneInfo timezone:", tz, type(tz)) # 11. Cohere embeddings (Pydantic model) @task def get_embeddings(): import pydantic from airflow.providers.cohere.hooks.cohere import CohereHook hook = CohereHook() embeddings = hook.create_embeddings(["gruyere"]) print("Cohere embeddings type:", type(embeddings)) print("Is Pydantic model?", isinstance(embeddings, pydantic.BaseModel)) return embeddings @task def print_embeddings(obj): print("Cohere Embeddings (Pydantic Model):", obj) # DAG chaining print_pandas(get_pandas()) print_bignum(get_bignum()) print_all_builtins(get_all_builtins()) print_numpy_ints(get_numpy_ints()) print_numpy_misc(get_numpy_misc()) print_python_datetime_types(get_python_datetime_types()) print_pendulum_datetime_type(get_pendulum_datetime_type()) print_timezone_aware(get_timezone_aware()) print_pendulum_tz(get_pendulum_tz()) print_zoneinfo_tz(get_zoneinfo_tz()) print_embeddings(get_embeddings()) pydantic_serde() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org