ephraimbuddy commented on code in PR #42913: URL: https://github.com/apache/airflow/pull/42913#discussion_r1823891050
########## tests/jobs/test_scheduler_job.py: ########## @@ -3384,57 +3381,58 @@ def test_verify_integrity_if_dag_changed(self, dag_maker): assert tis_count == 2 latest_dag_version = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session) - assert dr.dag_hash == latest_dag_version + assert dr.dag_version.serialized_dag.dag_hash == latest_dag_version session.rollback() session.close() - def test_verify_integrity_if_dag_disappeared(self, dag_maker, caplog): - # CleanUp - with create_session() as session: - session.query(SerializedDagModel).filter( - SerializedDagModel.dag_id == "test_verify_integrity_if_dag_disappeared" - ).delete(synchronize_session=False) - - with dag_maker(dag_id="test_verify_integrity_if_dag_disappeared") as dag: - BashOperator(task_id="dummy", bash_command="echo hi") - - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) - - session = settings.Session() - orm_dag = dag_maker.dag_model - assert orm_dag is not None - - scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) - - self.job_runner.processor_agent = mock.MagicMock() - dag = self.job_runner.dagbag.get_dag("test_verify_integrity_if_dag_disappeared", session=session) - self.job_runner._create_dag_runs([orm_dag], session) - dag_id = dag.dag_id - drs = DagRun.find(dag_id=dag_id, session=session) - assert len(drs) == 1 - dr = drs[0] - - dag_version_1 = SerializedDagModel.get_latest_version_hash(dag_id, session=session) - assert dr.dag_hash == dag_version_1 - assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_disappeared": dag} - assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_disappeared").tasks) == 1 - - SerializedDagModel.remove_dag(dag_id=dag_id) - dag = self.job_runner.dagbag.dags[dag_id] - self.job_runner.dagbag.dags = MagicMock() - self.job_runner.dagbag.dags.get.side_effect = [dag, None] - session.flush() - with caplog.at_level(logging.WARNING): - callback = self.job_runner._schedule_dag_run(dr, session) - assert "The DAG disappeared before verifying integrity" in caplog.text - - assert callback is None - - session.rollback() - session.close() + # def test_verify_integrity_if_dag_disappeared(self, dag_maker, caplog): Review Comment: I will remove it. Serialized Dag can't disappear again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org