potiuk edited a comment on pull request #14792: URL: https://github.com/apache/airflow/pull/14792#issuecomment-799849287
unfortunately with the`__del__` solution we have again flakines when run in parallel. There must be another reason for this. I am reverting back now to ecplicit scheduler stopping and will see if I can reproduce this. https://github.com/apache/airflow/pull/14531/checks?check_run_id=2117086265#step:6:10807 ``` _______________ TestSchedulerJob.test_scheduler_verify_pool_full _______________ self = <tests.jobs.test_scheduler_job.TestSchedulerJob testMethod=test_scheduler_verify_pool_full> def test_scheduler_verify_pool_full(self): """ Test task instances not queued when pool is full """ dag = DAG(dag_id='test_scheduler_verify_pool_full', start_date=DEFAULT_DATE) BashOperator( task_id='dummy', dag=dag, owner='airflow', pool='test_scheduler_verify_pool_full', bash_command='echo hi', ) dagbag = DagBag( dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), include_examples=False, read_dags_from_db=True, ) dagbag.bag_dag(dag=dag, root_dag=dag) dagbag.sync_to_db() session = settings.Session() pool = Pool(pool='test_scheduler_verify_pool_full', slots=1) session.add(pool) session.flush() dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) SerializedDagModel.write_dag(dag) scheduler = SchedulerJob(executor=self.null_exec) scheduler.processor_agent = mock.MagicMock() # Create 2 dagruns, which will create 2 task instances. dr = dag.create_dagrun( run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE, state=State.RUNNING, ) > scheduler._schedule_dag_run(dr, {}, session) tests/jobs/test_scheduler_job.py:2585: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ airflow/jobs/scheduler_job.py:1707: in _schedule_dag_run dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) airflow/utils/session.py:62: in wrapper return func(*args, **kwargs) airflow/models/dagbag.py:178: in get_dag self._add_dag_from_db(dag_id=dag_id, session=session) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <airflow.models.dagbag.DagBag object at 0x7f1cf7b23760> dag_id = 'test_scheduler_verify_pool_full' session = <sqlalchemy.orm.session.Session object at 0x7f1cf781e310> def _add_dag_from_db(self, dag_id: str, session: Session): """Add DAG to DagBag from DB""" from airflow.models.serialized_dag import SerializedDagModel row = SerializedDagModel.get(dag_id, session) if not row: > raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table") ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
