arch-DJ opened a new issue #13504:
URL: https://github.com/apache/airflow/issues/13504


   **Apache Airflow version**: 2.0
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): Not relevant
   
   **Environment**: 
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release): CentOS Linux 7 (Core)
   - **Kernel** (e.g. `uname -a`): Linux us01odcres-jamuaar-0003 
3.10.0-957.5.1.el7.x86_64 #1 SMP Fri Feb 1 14:54:57 UTC 2019 x86_64 x86_64 
x86_64 GNU/Linux
   
   - **Install tools**: PostgreSQL 12.2
   - **Others**:
   
   **What happened**:
   
   I have 2 dag files say, dag1.py and dag2.py.
   dag1.py creates a static DAG i.e. once it's parsed it will create 1 specific 
DAG.
   dag2.py creates dynamic DAGs based on json files kept in an external 
location.
   
   The static DAG (generated from dag1.py) has a task in the later stage which 
generates json files and they get picked up by dag2.py which creates dynamic 
DAGs.
   
   The dynamic DAGs which get created are unpaused by default and get scheduled 
once.
   This whole process used to work fine with airflow 1.x where DAG 
serialization was not mandatory and was turned off by default.
   
   But with Airflow 2.0 I am getting the following exception occasionally when 
the dynamically generated DAGs try to get scheduled by the scheduler.
   
   ```
   [2021-01-06 10:09:38,742] {scheduler_job.py:1293} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1275, in _execute
       self._run_scheduler_loop()
     File 
"/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1377, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1474, in _do_scheduling
       self._create_dag_runs(query.all(), session)
     File 
"/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1557, in _create_dag_runs
       dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
     File 
"/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", 
line 62, in wrapper
       return func(*args, **kwargs)
     File 
"/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", 
line 171, in get_dag
       self._add_dag_from_db(dag_id=dag_id, session=session)
     File 
"/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", 
line 227, in _add_dag_from_db
       raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag 
table")
   airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in 
serialized_dag table
   ```
   When I checked the serialized_dag table manually, I am able to see the DAG 
entry there.
   I found the last_updated column value to be **2021-01-06 
10:09:38.757076+05:30**
   Whereas the exception got logged at **[2021-01-06 10:09:38,742]** which is 
little before the last_updated time.
   
   I think this means that the Scheduler tried to look for the DAG entry in the 
serialized_dag table before DagFileProcessor created the entry.
   
   Is this right or something else can be going on here?
   
   **What you expected to happen**:
   
   Scheduler should start looking for the DAG entry in the serialized_dag table 
only after DagFileProcessor has added it.
   Here it seems that DagFileProcessor added the DAG entry in the **dag** 
table, scheduler immediately fetched this dag_id from it and tried to find the 
same in **serialized_dag** table even before DagFileProcessor could add that.
   
   **How to reproduce it**:
   It occurs occasionally and there is no well defined way to reproduce it.
   
   
   **Anything else we need to know**:
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to