kaxil commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1891339620
##########
airflow/dag_processing/collection.py:
##########
@@ -419,7 +443,7 @@ def update_dags(
if run_info.num_active_runs.get(dag.dag_id, 0) >=
dm.max_active_runs:
dm.next_dagrun_create_after = None
else:
- dm.calculate_dagrun_date_fields(dag,
last_automated_data_interval)
+ dm.calculate_dagrun_date_fields(dag,
last_automated_data_interval) # type: ignore[arg-type]
Review Comment:
Why do we need to ignore this?
##########
airflow/dag_processing/collection.py:
##########
@@ -419,7 +443,7 @@ def update_dags(
if run_info.num_active_runs.get(dag.dag_id, 0) >=
dm.max_active_runs:
dm.next_dagrun_create_after = None
else:
- dm.calculate_dagrun_date_fields(dag,
last_automated_data_interval)
+ dm.calculate_dagrun_date_fields(dag,
last_automated_data_interval) # type: ignore[arg-type]
Review Comment:
probably coz of MaybeSerializedDAG?
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1870,3 +1871,86 @@ def _has_kubernetes() -> bool:
except ImportError:
HAS_KUBERNETES = False
return HAS_KUBERNETES
+
+
+AssetT = TypeVar("AssetT", bound=BaseAsset)
+MaybeSerializedDAG = Union[DAG, "LazyDeserializedDAG"]
+
+
+class LazyDeserializedDAG(pydantic.BaseModel):
+ """
+ Lazily build information from the serialized DAG structure.
+
+ An object that will present "enough" of the DAG like interface to update
DAG db models etc, without having
+ to deserialize the full DAG and Task hierarchy.
+ """
+
+ data: dict
+
+ NULLABLE_PROPERTIES: ClassVar[set[str]] = {
+ "is_paused_upon_creation",
+ "owner",
+ "dag_display_name",
+ "description",
+ "max_active_tasks",
+ "max_active_runs",
+ "max_consecutive_failed_dag_runs",
+ "owner_links",
+ "access_control",
+ "default_view",
+ }
+
+ @property
+ def hash(self) -> str:
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ return SerializedDagModel.hash(self.data)
+
+ def next_dagrun_info(self, *args, **kwargs) -> DagRunInfo | None:
+ # This function is complex to implement, for now we delegate
deserialize the dag and delegate to that.
+ return self._real_dag.next_dagrun_info(*args, **kwargs)
+
+ @cached_property
+ def _real_dag(self):
+ return SerializedDAG.from_dict(self.data)
+
+ def __getattr__(self, name: str, /) -> Any:
+ if name in self.NULLABLE_PROPERTIES:
+ return self.data["dag"].get(name)
+ try:
+ return self.data["dag"][name]
+ except KeyError:
+ raise AttributeError(f"{type(self).__name__!r} object has no
attribute {name!r}") from None
+
+ @property
+ def timetable(self) -> Timetable:
+ return decode_timetable(self.data["dag"]["timetable"])
+
+ @property
+ def has_task_concurrency_limits(self) -> bool:
+ return any(task.get("max_active_tis_per_dag") is not None for task in
self.data["dag"]["tasks"])
+
+ def get_task_assets(
+ self,
+ inlets: bool = True,
+ outlets: bool = True,
+ of_type: type[AssetT] = Asset, # type: ignore[assignment]
+ ) -> Generator[tuple[str, AssetT], None, None]:
Review Comment:
Worth adding a comment on why we are getting / have method for `task_assets`
here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]