kaxil commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1891339620


##########
airflow/dag_processing/collection.py:
##########
@@ -419,7 +443,7 @@ def update_dags(
             if run_info.num_active_runs.get(dag.dag_id, 0) >= 
dm.max_active_runs:
                 dm.next_dagrun_create_after = None
             else:
-                dm.calculate_dagrun_date_fields(dag, 
last_automated_data_interval)
+                dm.calculate_dagrun_date_fields(dag, 
last_automated_data_interval)  # type: ignore[arg-type]

Review Comment:
   Why do we need to ignore this?



##########
airflow/dag_processing/collection.py:
##########
@@ -419,7 +443,7 @@ def update_dags(
             if run_info.num_active_runs.get(dag.dag_id, 0) >= 
dm.max_active_runs:
                 dm.next_dagrun_create_after = None
             else:
-                dm.calculate_dagrun_date_fields(dag, 
last_automated_data_interval)
+                dm.calculate_dagrun_date_fields(dag, 
last_automated_data_interval)  # type: ignore[arg-type]

Review Comment:
   probably coz of MaybeSerializedDAG?



##########
airflow/serialization/serialized_objects.py:
##########
@@ -1870,3 +1871,86 @@ def _has_kubernetes() -> bool:
     except ImportError:
         HAS_KUBERNETES = False
     return HAS_KUBERNETES
+
+
+AssetT = TypeVar("AssetT", bound=BaseAsset)
+MaybeSerializedDAG = Union[DAG, "LazyDeserializedDAG"]
+
+
+class LazyDeserializedDAG(pydantic.BaseModel):
+    """
+    Lazily build information from the serialized DAG structure.
+
+    An object that will present "enough" of the DAG like interface to update 
DAG db models etc, without having
+    to deserialize the full DAG and Task hierarchy.
+    """
+
+    data: dict
+
+    NULLABLE_PROPERTIES: ClassVar[set[str]] = {
+        "is_paused_upon_creation",
+        "owner",
+        "dag_display_name",
+        "description",
+        "max_active_tasks",
+        "max_active_runs",
+        "max_consecutive_failed_dag_runs",
+        "owner_links",
+        "access_control",
+        "default_view",
+    }
+
+    @property
+    def hash(self) -> str:
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        return SerializedDagModel.hash(self.data)
+
+    def next_dagrun_info(self, *args, **kwargs) -> DagRunInfo | None:
+        # This function is complex to implement, for now we delegate 
deserialize the dag and delegate to that.
+        return self._real_dag.next_dagrun_info(*args, **kwargs)
+
+    @cached_property
+    def _real_dag(self):
+        return SerializedDAG.from_dict(self.data)
+
+    def __getattr__(self, name: str, /) -> Any:
+        if name in self.NULLABLE_PROPERTIES:
+            return self.data["dag"].get(name)
+        try:
+            return self.data["dag"][name]
+        except KeyError:
+            raise AttributeError(f"{type(self).__name__!r} object has no 
attribute {name!r}") from None
+
+    @property
+    def timetable(self) -> Timetable:
+        return decode_timetable(self.data["dag"]["timetable"])
+
+    @property
+    def has_task_concurrency_limits(self) -> bool:
+        return any(task.get("max_active_tis_per_dag") is not None for task in 
self.data["dag"]["tasks"])
+
+    def get_task_assets(
+        self,
+        inlets: bool = True,
+        outlets: bool = True,
+        of_type: type[AssetT] = Asset,  # type: ignore[assignment]
+    ) -> Generator[tuple[str, AssetT], None, None]:

Review Comment:
   Worth adding a comment on why we are getting / have method for `task_assets` 
 here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to