uranusjr commented on code in PR #58669:
URL: https://github.com/apache/airflow/pull/58669#discussion_r2583831885


##########
airflow-core/src/airflow/serialization/encoders.py:
##########
@@ -0,0 +1,296 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import contextlib
+import datetime
+import functools
+from typing import TYPE_CHECKING, Any, TypeVar, overload
+
+import attrs
+import pendulum
+
+from airflow.sdk import (
+    Asset,
+    AssetAlias,
+    AssetAll,
+    AssetAny,
+    AssetOrTimeSchedule,
+    CronDataIntervalTimetable,
+    CronTriggerTimetable,
+    DeltaDataIntervalTimetable,
+    DeltaTriggerTimetable,
+    EventsTimetable,
+    MultipleCronTriggerTimetable,
+)
+from airflow.sdk.definitions.asset import AssetRef
+from airflow.sdk.definitions.timetables.assets import AssetTriggeredTimetable
+from airflow.sdk.definitions.timetables.base import BaseTimetable
+from airflow.sdk.definitions.timetables.simple import ContinuousTimetable, 
NullTimetable, OnceTimetable
+from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
+from airflow.serialization.helpers import find_registered_custom_timetable, 
is_core_timetable_import_path
+from airflow.timetables.base import Timetable as CoreTimetable
+from airflow.utils.docs import get_docs_url
+from airflow.utils.module_loading import qualname
+
+if TYPE_CHECKING:
+    from dateutil.relativedelta import relativedelta
+
+    from airflow.sdk.definitions.asset import BaseAsset
+    from airflow.triggers.base import BaseEventTrigger
+
+    T = TypeVar("T")
+
+
+def encode_relativedelta(var: relativedelta) -> dict[str, Any]:
+    """Encode a relativedelta object."""
+    encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_") 
and v}
+    if var.weekday and var.weekday.n:
+        # Every n'th Friday for example
+        encoded["weekday"] = [var.weekday.weekday, var.weekday.n]
+    elif var.weekday:
+        encoded["weekday"] = [var.weekday.weekday]
+    return encoded
+
+
+def encode_timezone(var: str | pendulum.Timezone | pendulum.FixedTimezone) -> 
str | int:
+    """
+    Encode a Pendulum Timezone for serialization.
+
+    Airflow only supports timezone objects that implements Pendulum's Timezone
+    interface. We try to keep as much information as possible to make 
conversion
+    round-tripping possible (see ``decode_timezone``). We need to special-case
+    UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
+    0 without the special case), but passing 0 into ``pendulum.timezone`` does
+    not give us UTC (but ``+00:00``).
+    """
+    if isinstance(var, str):
+        return var
+    if isinstance(var, pendulum.FixedTimezone):
+        if var.offset == 0:
+            return "UTC"
+        return var.offset
+    if isinstance(var, pendulum.Timezone):
+        return var.name
+    raise ValueError(
+        f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}. "
+        f"See {get_docs_url('timezone.html#time-zone-aware-dags')}"
+    )
+
+
+def encode_interval(interval: datetime.timedelta | relativedelta) -> float | 
dict:
+    if isinstance(interval, datetime.timedelta):
+        return interval.total_seconds()
+    return encode_relativedelta(interval)
+
+
+def encode_run_immediately(value: bool | datetime.timedelta) -> bool | float:
+    if isinstance(value, datetime.timedelta):
+        return value.total_seconds()
+    return value
+
+
+def encode_trigger(trigger: BaseEventTrigger | dict):
+    from airflow.serialization.serialized_objects import BaseSerialization
+
+    def _ensure_serialized(d):
+        """
+        Make sure the kwargs dict is JSON-serializable.
+
+        This is done with BaseSerialization logic. A simple check is added to
+        ensure we don't double-serialize, which is possible when a trigger goes
+        through multiple serialization layers.
+        """
+        if isinstance(d, dict) and Encoding.TYPE in d:
+            return d
+        return BaseSerialization.serialize(d)
+
+    if isinstance(trigger, dict):
+        classpath = trigger["classpath"]
+        kwargs = trigger["kwargs"]
+    else:
+        classpath, kwargs = trigger.serialize()
+    return {
+        "classpath": classpath,
+        "kwargs": {k: _ensure_serialized(v) for k, v in kwargs.items()},
+    }
+
+
+def encode_asset_condition(a: BaseAsset) -> dict[str, Any]:
+    """
+    Encode an asset condition.
+
+    :meta private:
+    """
+    d: dict[str, Any]
+    match a:
+        case Asset():
+            d = {"__type": DAT.ASSET, "name": a.name, "uri": a.uri, "group": 
a.group, "extra": a.extra}
+            if a.watchers:
+                d["watchers"] = [{"name": w.name, "trigger": 
encode_trigger(w.trigger)} for w in a.watchers]
+            return d
+        case AssetAlias():
+            return {"__type": DAT.ASSET_ALIAS, "name": a.name, "group": 
a.group}
+        case AssetAll():
+            return {"__type": DAT.ASSET_ALL, "objects": 
[encode_asset_condition(x) for x in a.objects]}
+        case AssetAny():
+            return {"__type": DAT.ASSET_ANY, "objects": 
[encode_asset_condition(x) for x in a.objects]}
+        case AssetRef():
+            return {"__type": DAT.ASSET_REF, **attrs.asdict(a)}
+    raise ValueError(f"serialization not implemented for {type(a).__name__!r}")
+
+
+def _get_serialized_import_path(var: BaseTimetable | CoreTimetable) -> str:

Review Comment:
   Good catch, I’ll change this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to