sjyangkevin commented on PR #51059:
URL: https://github.com/apache/airflow/pull/51059#issuecomment-3002938787
I would like to attach my test DAG code which check most of the serializers
and deserializers, except for iceberg, and deltalake. Hope this can be helpful
for review. Thanks.

```python
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.providers.cohere.hooks.cohere import CohereHook
from airflow.providers.cohere.operators.embedding import
CohereEmbeddingOperator
from pendulum import datetime
COHERE_CONN_ID = "cohere_default"
@dag(
start_date=datetime(2025, 5, 23),
schedule=None,
catchup=False,
)
def pydantic_serde():
@task
def get_pandas():
import pandas as pd
import numpy as np
return pd.DataFrame(np.random.randn(3, 2), columns=list('AB'))
@task
def print_pandas(df):
print("Pandas DataFrame")
print(df)
@task
def get_bignum():
import decimal
return decimal.Decimal(1234567891011)
@task
def print_bignum(n):
print("bignum:", n)
@task
def get_list():
return [1, 2, 3, 4]
@task
def print_list(l):
print(l)
@task
def get_set():
return set([1, 2, 3, 4])
@task
def print_set(s):
print(s)
@task
def get_tuple():
return (1, 2, 3, 4)
@task
def print_tuple(t):
print(t)
@task
def get_frozenset():
return frozenset([1,2,3,4])
@task
def print_frozenset(fs):
print(fs)
@task
def get_numpy():
import numpy as np
n = np.random.rand(3,2)[0][0]
print(type(n))
return n
@task
def get_datetime():
import datetime
return datetime.datetime.now()
@task
def print_datetime(dt):
print(dt)
@task
def get_timezone():
from zoneinfo import ZoneInfo
from datetime import datetime
return datetime(2020, 10, 31, 12, tzinfo=ZoneInfo("America/Toronto"))
@task
def get_pendulum_tz():
import pendulum
return pendulum.timezone("Europe/Paris")
@task
def print_pendulum_tz(tz):
print(tz)
@task
def print_timezone(tz):
print(tz)
@task
def get_pendulum_datetime():
import pendulum
return pendulum.now()
@task
def print_pendulum_datetime(dt):
print(dt)
@task
def print_numpy(n):
print("NumPy Array")
print(n)
@task
def get_embeddings():
# this uses the older provider version when embedding is returned as
a pydantic model
import pydantic
cohere_hook = CohereHook()
embeddings = cohere_hook.create_embeddings(["gruyere"])
print("type of embeddings:", type(embeddings))
print("is embedding type pydantic:", isinstance(embeddings,
pydantic.BaseModel))
return embeddings
@task
def print_embeddings(embeddings):
print("Pydantic Model")
print(embeddings)
print_embeddings(get_embeddings())
print_numpy(get_numpy())
print_pandas(get_pandas())
print_list(get_list())
print_set(get_set())
print_tuple(get_tuple())
print_bignum(get_bignum())
print_datetime(get_datetime())
print_pendulum_datetime(get_pendulum_datetime())
print_frozenset(get_frozenset())
print_timezone(get_timezone())
print_pendulum_tz(get_pendulum_tz())
pydantic_serde()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]