dotsinspace opened a new issue, #30157:
URL: https://github.com/apache/airflow/issues/30157

   ### Apache Airflow version
   
   2.5.2
   
   ### What happened
   
   NOTE: I am building application based on ReactFlow and Backend in Python 
using Airflow
   
   Please review this issue on stackoverflow.
   
   [Airflow "This DAG isnt available in the webserver DagBag object 
"](https://stackoverflow.com/questions/41560614/airflow-this-dag-isnt-available-in-the-webserver-dagbag-object)
   
   Problem:
   
   I have following code which dynamically creates Dag file and store it in 
/Users/[username]/airflow
   
   `
   #
   # IMPORTS
   #
   import os  # CORE: Operating System module for file system operations.
   import inspect  # CORE: Inspect module for getting source code of a function.
   from datetime import datetime  # CORE: Datetime module for date and time 
operations.
   from airflow.models import DAG, DagBag  # PIP: Airflow DAG and DagBag 
modules for DAG operations.
   from airflow.operators.python_operator import PythonOperator  # PIP: Airflow 
PythonOperator module for Python operations.
   
   
   #
   # ENVS
   #
   os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
   
   
   #
   # FUNCTIONS
   #
   def DagFunctionToExecute():
       from datetime import datetime, timedelta
       from airflow import DAG
       from airflow.operators.bash_operator import BashOperator
   
       default_args = {
           'owner': 'airflow',
           'depends_on_past': False,
           'start_date': datetime(2023, 3, 16),
           'email': ['[email protected]'],
           'email_on_failure': False,
           'email_on_retry': False,
           'retries': 1,
           'retry_delay': timedelta(minutes=5),
       }
   
       dag = DAG('Amazing', default_args=default_args, 
schedule_interval=timedelta(days=1))
   
       t1 = BashOperator(
           task_id='print_date',
           bash_command='date',
           dag=dag)
   
       t2 = BashOperator(
           task_id='sleep',
           bash_command='sleep 5',
           retries=3,
           dag=dag)
   
       t1 >> t2
   
   
   def CheckDagInDagBag(__id):
       # Load all tags from dagbag folder.
       dagbag = DagBag(dag_folder=os.path.join(os.environ['AIRFLOW_HOME'], 
'dags'))
   
       print(dagbag.dags, os.path.join(os.environ['AIRFLOW_HOME'], 'dags'))
   
       # Check if DAG is present in the DagBag.
       if dagbag.dags.get(__id):
           # Print statement that dag is present.
           print(f"DAG with id {__id} is present in the DagBag.")
       else:
           # Print statement that dag is not present.
           print(f"DAG with id {__id} is not present in the DagBag.")
   
   
   def CreateOperators(__dag, __edges, __functionToExecute):
       # Variable assignment.
       operators = {}
   
       # Loop over edges and convert them to operators.
       for item in __edges:
           source = item['source']
           target = item['target']
   
           # Check if source and target are not in operators.
           if source not in operators:
               # Update operator with source and python operator
               # with callable python function.
               operators[source] = PythonOperator(
                   task_id=source,
                   python_callable=__functionToExecute,
                   provide_context=True,
                   dag=__dag,
                   op_kwargs={'source': source, '__dagId': __dag.dag_id}
               )
   
           # Check if target is not in operators.
           if target not in operators:
               # Update operator with target and python operator
               operators[target] = PythonOperator(
                   task_id=target,
                   python_callable=__functionToExecute,
                   provide_context=True,
                   dag=__dag,
                   op_kwargs={'source': source, '__dagId': __dag.dag_id}
               )
   
           # Set upstream of target to source.
           operators[target].set_upstream(operators[source])
   
       # Return operators.
       return operators
   
   
   def SyncDagToDb(__dag):
       # Sync DAG to database.
       __dag.sync_to_db()
   
       # Check if DAG is present in the DagBag.
       CheckDagInDagBag(__dag.dag_id)
   
   
   def WriteDagToFile(__dag, __functionToExecute):
       # Variable assignment.
       dagId = __dag.dag_id
       fileName = f"{dagId}.py"
   
       # Get source code of DAG.
       sourceCode = inspect.getsource(__functionToExecute)
   
       # Write DAG to file.
       with open(os.path.join(os.environ['AIRFLOW_HOME'], 'dags', fileName), 
"w", encoding='utf8') as __dagFile:
           # Write source to the file.
           __dagFile.write(sourceCode)
   
   
   def CreateDag(__edges, __title, __description):
       # Create DAG.
       dag = DAG(
           dag_id=__title,
           description=__description,
           start_date=datetime.now(),
           default_view='tree',
           schedule_interval='@daily',
           catchup=False
       )
   
       # Create operators and sync DAG to database.
       # along side to it write dag to file.
       CreateOperators(dag, __edges, DagFunctionToExecute)
       SyncDagToDb(dag)
       WriteDagToFile(dag, DagFunctionToExecute)
   
       # Return dag.
       return dag
   `
   
   Now when i Run `CreateDag` it creates file in dags folder and works like 
charm.
   
   Screenshot of Dags Folder
   
   <img width="918" alt="Screenshot 2023-03-17 at 3 25 49 AM" 
src="https://user-images.githubusercontent.com/76559517/225761133-e09b1497-7a77-423a-b622-f5faa3800938.png";>
   
   and Dag do appear in Airflow webApp screenshot is below.
   
   <img width="1728" alt="Screenshot 2023-03-17 at 3 26 36 AM" 
src="https://user-images.githubusercontent.com/76559517/225761257-4d72228d-e0d9-4788-a28f-126c1f1d794c.png";>
   
   Problem is that when i click on Dag it throws error that **DAG "Amazinsg" 
seems to be missing from DagBag.** screenshot is below
   
   <img width="1728" alt="Screenshot 2023-03-17 at 3 27 35 AM" 
src="https://user-images.githubusercontent.com/76559517/225761416-498edcee-d9ba-4288-a18e-c4dd82d104c4.png";>
   
   
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   <img width="747" alt="Screenshot 2023-03-17 at 3 32 59 AM" 
src="https://user-images.githubusercontent.com/76559517/225762828-bc67c845-60bc-4a55-bb88-2628a74a25da.png";>
   
   
   ### Operating System
   
   OS Ventura
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-celery==3.1.0
   apache-airflow-providers-common-sql==1.3.4
   apache-airflow-providers-ftp==3.3.1
   apache-airflow-providers-http==4.2.0
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-sqlite==3.3.1
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   💔 This issue is opened from last 6 Years on Stackoverflow and still it is 
unresolved. its a request to you guys please help me out in this i dont move 
away from airflow because i have already spent last 5 Days coding whole server 
around it. Hope you guys understand.
   
   Stackoverflow: 
[https://stackoverflow.com/questions/41560614/airflow-this-dag-isnt-available-in-the-webserver-dagbag-object](Stackoverflow)
   
   Regards
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to