ashb commented on code in PR #44898:
URL: https://github.com/apache/airflow/pull/44898#discussion_r1884006852


##########
airflow/dag_processing/collection.py:
##########
@@ -163,6 +168,178 @@ def _update_dag_owner_links(dag_owner_links: dict[str, 
str], dm: DagModel, *, se
     )
 
 
+def _serialize_dag_capturing_errors(dag, session, processor_subdir):
+    """
+    Try to serialize the dag to the DB, but make a note of any errors.
+
+    We can't place them directly in import_errors, as this may be retried, and 
work the next time
+    """
+    from airflow import settings
+    from airflow.configuration import conf
+    from airflow.models.dagcode import DagCode
+    from airflow.models.serialized_dag import SerializedDagModel
+
+    try:
+        # We can't use bulk_write_to_db as we want to capture each error 
individually
+        dag_was_updated = SerializedDagModel.write_dag(
+            dag,
+            min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
+            session=session,
+            processor_subdir=processor_subdir,
+        )
+        if dag_was_updated:
+            _sync_dag_perms(dag, session=session)
+        else:
+            # Check and update DagCode
+            DagCode.update_source_code(dag)
+        return []
+    except OperationalError:
+        raise
+    except Exception:
+        log.exception("Failed to write serialized DAG: %s", dag.fileloc)
+        dagbag_import_error_traceback_depth = conf.getint("core", 
"dagbag_import_error_traceback_depth")
+        return [(dag.fileloc, 
traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
+
+
+def _sync_dag_perms(dag: DAG, session: Session):
+    """Sync DAG specific permissions."""
+    dag_id = dag.dag_id
+
+    log.debug("Syncing DAG permissions: %s to the DB", dag_id)
+    from airflow.www.security_appless import ApplessAirflowSecurityManager
+
+    security_manager = ApplessAirflowSecurityManager(session=session)
+    security_manager.sync_perm_for_dag(dag_id, dag.access_control)
+
+
+def _update_dag_warnings(dag_ids: list[str], warnings: set[DagWarning], 
session: Session):
+    from airflow.models.dagwarning import DagWarning
+
+    stored_warnings = set(
+        session.scalars(
+            select(DagWarning).where(
+                DagWarning.dag_id.in_(dag_ids),
+                # TODO: Previously this removed only 
DagWarningType.NONEXISTENT_POOL -- is it safe to remove
+                # everything?
+            )
+        )
+    )

Review Comment:
   Hmmm yes. I wonder how to handle that, I don't really want this function to 
be type specific.... I guess it might have to take an optional list of 
DagWarningType, and if so only looks at those 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to