sjyangkevin commented on PR #52360: URL: https://github.com/apache/airflow/pull/52360#issuecomment-3017910224
Hi @bolkedebruin , @potiuk , @amoghrajesh , I would appreciate if I could get your help again to review this PR. I also wanted to share some findings while doing testing. ### Issue with serializing `numpy.bool` It looks like the `serialize` method in the `numpy.py` module has issue with serializing `numpy.bool` objects. I think it's because the qualified name (i.e., `numpy.bool`) is not matched to the ones set in the list. I attempted to add an entry `numpy.bool` into the `serializers` list, and it could solve the issue. However, it failed the test `airflow-core/tests/unit/serialization/test_serde.py::TestSerDe::test_serializers_importable_and_str` and details can be found here: https://github.com/apache/airflow/actions/runs/15964146039/job/45021686182. ```python serializers = [ "numpy.int8", "numpy.int16", "numpy.int32", "numpy.int64", "numpy.uint8", "numpy.uint16", "numpy.uint32", "numpy.uint64", "numpy.bool_", "numpy.float64", "numpy.float16", "numpy.complex128", "numpy.complex64", ] ```   ### Issue with an unit test case I also found a failed test case when I ran pytest in the breeze environment on the `airflow-core/tests/unit/serialization`. It looks like the DAG is missing, and not sure if we need some clean up for this, and I didn't see this failure when running `breeze testing core-tests --test-type Serialization`.  ### Test DAG code I also updated the DAG code to test for as many objects as possible in serialize/deserialize, except for iceberg and deltalake. ```python from airflow.decorators import dag, task from pendulum import datetime @dag( start_date=datetime(2025, 5, 23), schedule=None, catchup=False, tags=["serialization", "pydantic", "airflow"], ) def pydantic_serde(): # 1. Pandas DataFrame @task def get_pandas(): import pandas as pd import numpy as np df = pd.DataFrame(np.random.randn(3, 2), columns=list("AB")) return df @task def print_pandas(df): print("Pandas DataFrame:", df) # 2. Decimal @task def get_bignum(): from decimal import Decimal return Decimal(1) / Decimal(7) @task def print_bignum(n): print("Decimal:", n, type(n)) # 3. Built-in collections @task def get_all_builtins(): return { "list": [1, 2, 3], "set": {4, 5}, "tuple": (6, 7), "frozenset": frozenset([8, 9]) } @task def print_all_builtins(obj): print("Built-in Types:") for k, v in obj.items(): print(f"{k}: {v} ({type(v)})") # 4. NumPy scalar types - integers @task def get_numpy_ints(): import numpy as np return { "int8": np.int8(8), "int16": np.int16(16), "int32": np.int32(32), "int64": np.int64(64), "uint8": np.uint8(8), "uint16": np.uint16(16), "uint32": np.uint32(32), "uint64": np.uint64(64), } @task def print_numpy_ints(obj): print("NumPy Integers:") for k, v in obj.items(): print(f"{k}: {v} ({type(v)})") # 5. NumPy scalar types - misc @task def get_numpy_misc(): import numpy as np return { "bool_": np.bool_(0), "float16": np.float16(0.125), "float64": np.float64(3.14159), "complex64": np.complex64(1 + 2j), "complex128": np.complex128(3 + 4j), } @task def print_numpy_misc(obj): print("NumPy Misc Types:") for k, v in obj.items(): print(f"{k}: {v} ({type(v)})") # 6. Python datetime types @task def get_python_datetime_types(): import datetime return { "date": datetime.date(2025, 6, 29), "datetime": datetime.datetime(2025, 6, 29, 12, 34, 56), "timedelta": datetime.timedelta(days=1, seconds=3600) } @task def print_python_datetime_types(obj): print("Python datetime types:") for k, v in obj.items(): print(f"{k}: {v} ({type(v)})") # 7. Pendulum datetime @task def get_pendulum_datetime_type(): import pendulum dt = pendulum.datetime(2025, 6, 29, 12, 34, 56, tz="Europe/Paris") return dt @task def print_pendulum_datetime_type(dt): print("Pendulum DateTime:", dt, type(dt)) # 8. Timezone-aware datetime (ZoneInfo) @task def get_timezone_aware(): from zoneinfo import ZoneInfo from datetime import datetime as dt return dt(2025, 6, 29, 12, 0, tzinfo=ZoneInfo("America/New_York")) @task def print_timezone_aware(tz_dt): print("Timezone-aware datetime:", tz_dt, type(tz_dt)) # 9. Pendulum timezone object @task def get_pendulum_tz(): import pendulum return pendulum.timezone("Asia/Tokyo") @task def print_pendulum_tz(tz): print("Pendulum timezone:", tz, type(tz)) # 10. ZoneInfo timezone object @task def get_zoneinfo_tz(): from zoneinfo import ZoneInfo return ZoneInfo("America/Toronto") @task def print_zoneinfo_tz(tz): print("ZoneInfo timezone:", tz, type(tz)) # 11. Cohere embeddings (Pydantic model) @task def get_embeddings(): import pydantic from airflow.providers.cohere.hooks.cohere import CohereHook hook = CohereHook() embeddings = hook.create_embeddings(["gruyere"]) print("Cohere embeddings type:", type(embeddings)) print("Is Pydantic model?", isinstance(embeddings, pydantic.BaseModel)) return embeddings @task def print_embeddings(obj): print("Cohere Embeddings (Pydantic Model):", obj) # DAG chaining print_pandas(get_pandas()) print_bignum(get_bignum()) print_all_builtins(get_all_builtins()) print_numpy_ints(get_numpy_ints()) print_numpy_misc(get_numpy_misc()) print_python_datetime_types(get_python_datetime_types()) print_pendulum_datetime_type(get_pendulum_datetime_type()) print_timezone_aware(get_timezone_aware()) print_pendulum_tz(get_pendulum_tz()) print_zoneinfo_tz(get_zoneinfo_tz()) print_embeddings(get_embeddings()) pydantic_serde() ``` ### DAG Test Results I modified the Cohere provider and let it return a Pydantic class in the breeze environment. 1.) When the Pydantic model is in the whitelist. All the tests defined in the DAG passed. 2.) When the Pydantic model is removed from the whitelist, the `print_embeddings` task failed due to `ImportError`. To whitelist the Pydantic model. Add the following to `files/airflow-breeze-config/environment_variables.env` ```txt AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES=cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings ```  The **numpy** check passed because I added `numpy.bool` into the `serializers` list, but this change is not added to this PR because it failed the checks.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
