rawwar commented on code in PR #56725:
URL: https://github.com/apache/airflow/pull/56725#discussion_r2438005201


##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -234,6 +238,39 @@ def _serialize_dags(
     return serialized_dags, serialization_import_errors
 
 
+def _get_dag_with_task(
+    dagbag: DagBag, dag_id: str, task_id: str | None = None
+) -> tuple[DAG, BaseOperator | MappedOperator | None]:
+    """
+    Retrieve a DAG and optionally a task from the DagBag.
+
+    :param dagbag: DagBag to retrieve from
+    :param dag_id: DAG ID to retrieve
+    :param task_id: Optional task ID to retrieve from the DAG
+    :return: tuple of (dag, task) where task is None if not requested
+    :raises ValueError: If DAG or task is not found
+    """
+    if dag_id not in dagbag.dags:
+        raise ValueError(
+            f"DAG '{dag_id}' not found in DagBag. "
+            f"This typically indicates a race condition where the DAG was 
removed or failed to parse."
+        )
+
+    dag = dagbag.dags[dag_id]
+
+    if task_id is not None:
+        try:
+            task = dag.get_task(task_id)
+            return dag, task
+        except TaskNotFound:
+            raise ValueError(
+                f"Task '{task_id}' not found in DAG '{dag_id}'. "
+                f"This typically indicates a race condition where the task was 
removed or the DAG structure changed."

Review Comment:
   @kaxil , just trying to understand why we are raising `ValueError` instead 
of `DagNotFound` or `TaskNotFound` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to