kaxil commented on code in PR #44898:
URL: https://github.com/apache/airflow/pull/44898#discussion_r1883948341


##########
airflow/dag_processing/collection.py:
##########
@@ -163,6 +168,178 @@ def _update_dag_owner_links(dag_owner_links: dict[str, 
str], dm: DagModel, *, se
     )
 
 
+def _serialize_dag_capturing_errors(dag, session, processor_subdir):

Review Comment:
   Worth adding type hints



##########
airflow/dag_processing/collection.py:
##########
@@ -163,6 +168,178 @@ def _update_dag_owner_links(dag_owner_links: dict[str, 
str], dm: DagModel, *, se
     )
 
 
+def _serialize_dag_capturing_errors(dag, session, processor_subdir):
+    """
+    Try to serialize the dag to the DB, but make a note of any errors.
+
+    We can't place them directly in import_errors, as this may be retried, and 
work the next time
+    """
+    from airflow import settings
+    from airflow.configuration import conf
+    from airflow.models.dagcode import DagCode
+    from airflow.models.serialized_dag import SerializedDagModel
+
+    try:
+        # We can't use bulk_write_to_db as we want to capture each error 
individually
+        dag_was_updated = SerializedDagModel.write_dag(
+            dag,
+            min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
+            session=session,
+            processor_subdir=processor_subdir,
+        )
+        if dag_was_updated:
+            _sync_dag_perms(dag, session=session)
+        else:
+            # Check and update DagCode
+            DagCode.update_source_code(dag)
+        return []
+    except OperationalError:
+        raise
+    except Exception:
+        log.exception("Failed to write serialized DAG: %s", dag.fileloc)

Review Comment:
   Do we want to log dag id, dag file path or both? the line says DAG but param 
is fileloc



##########
airflow/dag_processing/collection.py:
##########
@@ -163,6 +168,178 @@ def _update_dag_owner_links(dag_owner_links: dict[str, 
str], dm: DagModel, *, se
     )
 
 
+def _serialize_dag_capturing_errors(dag, session, processor_subdir):
+    """
+    Try to serialize the dag to the DB, but make a note of any errors.
+
+    We can't place them directly in import_errors, as this may be retried, and 
work the next time
+    """
+    from airflow import settings
+    from airflow.configuration import conf
+    from airflow.models.dagcode import DagCode
+    from airflow.models.serialized_dag import SerializedDagModel
+
+    try:
+        # We can't use bulk_write_to_db as we want to capture each error 
individually
+        dag_was_updated = SerializedDagModel.write_dag(
+            dag,
+            min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
+            session=session,
+            processor_subdir=processor_subdir,
+        )
+        if dag_was_updated:
+            _sync_dag_perms(dag, session=session)
+        else:
+            # Check and update DagCode
+            DagCode.update_source_code(dag)
+        return []
+    except OperationalError:
+        raise
+    except Exception:
+        log.exception("Failed to write serialized DAG: %s", dag.fileloc)
+        dagbag_import_error_traceback_depth = conf.getint("core", 
"dagbag_import_error_traceback_depth")
+        return [(dag.fileloc, 
traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
+
+
+def _sync_dag_perms(dag: DAG, session: Session):
+    """Sync DAG specific permissions."""
+    dag_id = dag.dag_id
+
+    log.debug("Syncing DAG permissions: %s to the DB", dag_id)
+    from airflow.www.security_appless import ApplessAirflowSecurityManager
+
+    security_manager = ApplessAirflowSecurityManager(session=session)
+    security_manager.sync_perm_for_dag(dag_id, dag.access_control)
+
+
+def _update_dag_warnings(dag_ids: list[str], warnings: set[DagWarning], 
session: Session):
+    from airflow.models.dagwarning import DagWarning
+
+    stored_warnings = set(
+        session.scalars(
+            select(DagWarning).where(
+                DagWarning.dag_id.in_(dag_ids),
+                # TODO: Previously this removed only 
DagWarningType.NONEXISTENT_POOL -- is it safe to remove
+                # everything?
+            )
+        )
+    )

Review Comment:
   I don't think it is safe. The other type is `ASSET_CONFLICT` which is 
handled at a different place and should not go away with the file



##########
tests/dag_processing/test_processor.py:
##########
@@ -246,183 +243,6 @@ def test_process_file_should_failure_callback(self, 
monkeypatch, tmp_path, get_t
         msg = " ".join([str(k) for k in ti.key.primary]) + " fired callback"
         assert msg in callback_file.read_text()
 
-    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
-    def 
test_add_unparseable_file_before_sched_start_creates_import_error(self, 
tmp_path):
-        unparseable_filename = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
-        with open(unparseable_filename, "w") as unparseable_file:
-            unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
-
-        with create_session() as session:
-            self._process_file(unparseable_filename, dag_directory=tmp_path, 
session=session)
-            import_errors = session.query(ParseImportError).all()
-
-            assert len(import_errors) == 1
-            import_error = import_errors[0]
-            assert import_error.filename == unparseable_filename
-            assert import_error.stacktrace == f"invalid syntax 
({TEMP_DAG_FILENAME}, line 1)"
-            session.rollback()
-
-    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
-    def test_add_unparseable_zip_file_creates_import_error(self, tmp_path):
-        zip_filename = (tmp_path / "test_zip.zip").as_posix()
-        invalid_dag_filename = os.path.join(zip_filename, TEMP_DAG_FILENAME)
-        with ZipFile(zip_filename, "w") as zip_file:
-            zip_file.writestr(TEMP_DAG_FILENAME, UNPARSEABLE_DAG_FILE_CONTENTS)
-
-        with create_session() as session:
-            self._process_file(zip_filename, dag_directory=tmp_path, 
session=session)
-            import_errors = session.query(ParseImportError).all()
-
-            assert len(import_errors) == 1
-            import_error = import_errors[0]
-            assert import_error.filename == invalid_dag_filename
-            assert import_error.stacktrace == f"invalid syntax 
({TEMP_DAG_FILENAME}, line 1)"
-            session.rollback()

Review Comment:
   Yea worth keeping it



##########
tests/dag_processing/test_processor.py:
##########
@@ -246,183 +243,6 @@ def test_process_file_should_failure_callback(self, 
monkeypatch, tmp_path, get_t
         msg = " ".join([str(k) for k in ti.key.primary]) + " fired callback"
         assert msg in callback_file.read_text()
 
-    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
-    def 
test_add_unparseable_file_before_sched_start_creates_import_error(self, 
tmp_path):
-        unparseable_filename = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
-        with open(unparseable_filename, "w") as unparseable_file:
-            unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
-
-        with create_session() as session:
-            self._process_file(unparseable_filename, dag_directory=tmp_path, 
session=session)
-            import_errors = session.query(ParseImportError).all()
-
-            assert len(import_errors) == 1
-            import_error = import_errors[0]
-            assert import_error.filename == unparseable_filename
-            assert import_error.stacktrace == f"invalid syntax 
({TEMP_DAG_FILENAME}, line 1)"
-            session.rollback()
-
-    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
-    def test_add_unparseable_zip_file_creates_import_error(self, tmp_path):
-        zip_filename = (tmp_path / "test_zip.zip").as_posix()
-        invalid_dag_filename = os.path.join(zip_filename, TEMP_DAG_FILENAME)
-        with ZipFile(zip_filename, "w") as zip_file:
-            zip_file.writestr(TEMP_DAG_FILENAME, UNPARSEABLE_DAG_FILE_CONTENTS)
-
-        with create_session() as session:
-            self._process_file(zip_filename, dag_directory=tmp_path, 
session=session)
-            import_errors = session.query(ParseImportError).all()
-
-            assert len(import_errors) == 1
-            import_error = import_errors[0]
-            assert import_error.filename == invalid_dag_filename
-            assert import_error.stacktrace == f"invalid syntax 
({TEMP_DAG_FILENAME}, line 1)"
-            session.rollback()
-
-    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
-    def test_dag_model_has_import_error_is_true_when_import_error_exists(self, 
tmp_path, session):
-        dag_file = os.path.join(TEST_DAGS_FOLDER, 
"test_example_bash_operator.py")
-        temp_dagfile = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
-        with open(dag_file) as main_dag, open(temp_dagfile, "w") as next_dag:
-            for line in main_dag:
-                next_dag.write(line)
-        # first we parse the dag
-        self._process_file(temp_dagfile, dag_directory=tmp_path, 
session=session)
-        # assert DagModel.has_import_errors is false
-        dm = session.query(DagModel).filter(DagModel.fileloc == 
temp_dagfile).first()
-        assert not dm.has_import_errors
-        # corrupt the file
-        with open(temp_dagfile, "a") as file:
-            file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
-
-        self._process_file(temp_dagfile, dag_directory=tmp_path, 
session=session)
-        import_errors = session.query(ParseImportError).all()
-
-        assert len(import_errors) == 1
-        import_error = import_errors[0]
-        assert import_error.filename == temp_dagfile
-        assert import_error.stacktrace
-        dm = session.query(DagModel).filter(DagModel.fileloc == 
temp_dagfile).first()
-        assert dm.has_import_errors
-
-    def test_no_import_errors_with_parseable_dag(self, tmp_path):
-        parseable_filename = tmp_path / TEMP_DAG_FILENAME
-        parseable_filename.write_text(PARSEABLE_DAG_FILE_CONTENTS)
-
-        with create_session() as session:
-            self._process_file(parseable_filename.as_posix(), 
dag_directory=tmp_path, session=session)
-            import_errors = session.query(ParseImportError).all()
-
-            assert len(import_errors) == 0
-
-            session.rollback()
-
-    def test_no_import_errors_with_parseable_dag_in_zip(self, tmp_path):

Review Comment:
   Worth keeping this -- for zipfile or do we check that somewhere else too?



##########
tests/dag_processing/test_collection.py:
##########
@@ -129,3 +154,235 @@ def test_add_asset_trigger_references(self, is_active, 
is_paused, expected_num_t
 
             assert session.query(Trigger).count() == expected_num_triggers
             assert session.query(asset_trigger_association_table).count() == 
expected_num_triggers
+
+
[email protected]_test
+class TestUpdateDagParsingResults:
+    """Tests centred around the ``update_dag_parsing_results_in_db`` 
function."""
+
+    @pytest.fixture
+    def clean_db(self, session):
+        yield
+        clear_db_serialized_dags()
+        clear_db_dags()
+        clear_db_import_errors()
+
+    @pytest.mark.usefixtures("clean_db")  # sync_perms in fab has bad session 
commit hygiene
+    def test_sync_perms_syncs_dag_specific_perms_on_update(
+        self, monkeypatch, spy_agency: SpyAgency, session, time_machine
+    ):
+        """
+        Test that dagbag.sync_to_db will sync DAG specific permissions when a 
DAG is
+        new or updated
+        """
+        from airflow import settings
+
+        serialized_dags_count = 
session.query(func.count(SerializedDagModel.dag_id)).scalar()
+        assert serialized_dags_count == 0
+
+        monkeypatch.setattr(settings, "MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
+        time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 0), tick=False)
+
+        dag = DAG(dag_id="test")
+
+        sync_perms_spy = spy_agency.spy_on(
+            airflow.dag_processing.collection._sync_dag_perms,
+            call_original=False,

Review Comment:
   TIL -- so this is like an actual mock



##########
tests/api_connexion/endpoints/test_task_endpoint.py:
##########
@@ -86,7 +86,7 @@ def setup_dag(self, configured_app):
             mapped_dag.dag_id: mapped_dag,
             unscheduled_dag.dag_id: unscheduled_dag,
         }
-        DagBag._sync_to_db(dag_bag.dags)

Review Comment:
   :D 



##########
airflow/dag_processing/collection.py:
##########
@@ -163,6 +168,178 @@ def _update_dag_owner_links(dag_owner_links: dict[str, 
str], dm: DagModel, *, se
     )
 
 
+def _serialize_dag_capturing_errors(dag, session, processor_subdir):
+    """
+    Try to serialize the dag to the DB, but make a note of any errors.
+
+    We can't place them directly in import_errors, as this may be retried, and 
work the next time
+    """
+    from airflow import settings
+    from airflow.configuration import conf
+    from airflow.models.dagcode import DagCode
+    from airflow.models.serialized_dag import SerializedDagModel
+
+    try:
+        # We can't use bulk_write_to_db as we want to capture each error 
individually
+        dag_was_updated = SerializedDagModel.write_dag(
+            dag,
+            min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
+            session=session,
+            processor_subdir=processor_subdir,
+        )
+        if dag_was_updated:
+            _sync_dag_perms(dag, session=session)
+        else:
+            # Check and update DagCode
+            DagCode.update_source_code(dag)
+        return []
+    except OperationalError:
+        raise
+    except Exception:
+        log.exception("Failed to write serialized DAG: %s", dag.fileloc)
+        dagbag_import_error_traceback_depth = conf.getint("core", 
"dagbag_import_error_traceback_depth")
+        return [(dag.fileloc, 
traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
+
+
+def _sync_dag_perms(dag: DAG, session: Session):
+    """Sync DAG specific permissions."""
+    dag_id = dag.dag_id
+
+    log.debug("Syncing DAG permissions: %s to the DB", dag_id)
+    from airflow.www.security_appless import ApplessAirflowSecurityManager
+
+    security_manager = ApplessAirflowSecurityManager(session=session)
+    security_manager.sync_perm_for_dag(dag_id, dag.access_control)
+
+
+def _update_dag_warnings(dag_ids: list[str], warnings: set[DagWarning], 
session: Session):
+    from airflow.models.dagwarning import DagWarning
+
+    stored_warnings = set(
+        session.scalars(
+            select(DagWarning).where(
+                DagWarning.dag_id.in_(dag_ids),
+                # TODO: Previously this removed only 
DagWarningType.NONEXISTENT_POOL -- is it safe to remove
+                # everything?
+            )
+        )
+    )
+
+    for warning_to_delete in stored_warnings - warnings:
+        session.delete(warning_to_delete)
+
+    for warning_to_add in warnings:
+        session.merge(warning_to_add)
+
+
+def _update_import_errors(
+    files_parsed: set[str], import_errors: dict[str, str], processor_subdir: 
str | None, session: Session
+):
+    from airflow.listeners.listener import get_listener_manager
+
+    # We can remove anything from files parsed in this batch that doesn't have 
an error. We need to remove old
+    # errors (i.e. form files that are removed) separately

Review Comment:
   ```suggestion
       # errors (i.e. from files that are removed) separately
   ```



##########
airflow/dag_processing/collection.py:
##########
@@ -163,6 +168,178 @@ def _update_dag_owner_links(dag_owner_links: dict[str, 
str], dm: DagModel, *, se
     )
 
 
+def _serialize_dag_capturing_errors(dag, session, processor_subdir):
+    """
+    Try to serialize the dag to the DB, but make a note of any errors.
+
+    We can't place them directly in import_errors, as this may be retried, and 
work the next time
+    """
+    from airflow import settings
+    from airflow.configuration import conf
+    from airflow.models.dagcode import DagCode
+    from airflow.models.serialized_dag import SerializedDagModel
+
+    try:
+        # We can't use bulk_write_to_db as we want to capture each error 
individually
+        dag_was_updated = SerializedDagModel.write_dag(
+            dag,
+            min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
+            session=session,
+            processor_subdir=processor_subdir,
+        )
+        if dag_was_updated:
+            _sync_dag_perms(dag, session=session)
+        else:
+            # Check and update DagCode
+            DagCode.update_source_code(dag)
+        return []
+    except OperationalError:
+        raise
+    except Exception:
+        log.exception("Failed to write serialized DAG: %s", dag.fileloc)
+        dagbag_import_error_traceback_depth = conf.getint("core", 
"dagbag_import_error_traceback_depth")
+        return [(dag.fileloc, 
traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
+
+
+def _sync_dag_perms(dag: DAG, session: Session):
+    """Sync DAG specific permissions."""
+    dag_id = dag.dag_id
+
+    log.debug("Syncing DAG permissions: %s to the DB", dag_id)
+    from airflow.www.security_appless import ApplessAirflowSecurityManager
+
+    security_manager = ApplessAirflowSecurityManager(session=session)
+    security_manager.sync_perm_for_dag(dag_id, dag.access_control)
+
+
+def _update_dag_warnings(dag_ids: list[str], warnings: set[DagWarning], 
session: Session):
+    from airflow.models.dagwarning import DagWarning
+
+    stored_warnings = set(
+        session.scalars(
+            select(DagWarning).where(
+                DagWarning.dag_id.in_(dag_ids),
+                # TODO: Previously this removed only 
DagWarningType.NONEXISTENT_POOL -- is it safe to remove
+                # everything?
+            )
+        )
+    )

Review Comment:
   
https://github.com/apache/airflow/blob/224a4e3a2fbc585840aa54e5a5048ea285c4e179/airflow/jobs/scheduler_job_runner.py#L2246-L2270



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to