yi-cheng-chen-taiwan opened a new issue, #41523:
URL: https://github.com/apache/airflow/issues/41523

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.9.0
   
   ### What happened?
   
   I want to build a custom timetable plugin that can get holidays from meta 
database variables.
   The data can fetch from variables.
   But there is an exception that throw from dag.py and the dag register 
failure.
   
   > Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py",
 line 182, in _run_file_processor
       _handle_dag_file_processing()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py",
 line 163, in _handle_dag_file_processing
       result: tuple[int, int] = dag_file_processor.process_file(
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 79, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py",
 line 859, in process_file
       serialize_errors = DagFileProcessor.save_dag_to_db(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/api_internal/internal_api_call.py",
 line 115, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 79, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py",
 line 895, in save_dag_to_db
       import_errors = DagBag._sync_to_db(dags=dags, 
processor_subdir=dag_directory, session=session)
                       
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dagbag.py", 
line 659, in _sync_to_db
       for attempt in run_with_db_retries(logger=log):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
347, in __iter__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
314, in iter
       return fut.result()
              ^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dagbag.py", 
line 675, in _sync_to_db
       DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir, 
session=session)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dag.py", line 
3191, in bulk_write_to_db
       orm_dag_links = orm_dag.dag_owner_links or []
                       ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py",
 line 487, in __get__
       return self.impl.get(state, dict_)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py",
 line 959, in get
       value = self._fire_loader_callables(state, key, passive)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py",
 line 995, in _fire_loader_callables
       return self.callable_(state, passive)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/strategies.py",
 line 863, in _load_for_state
       raise orm_exc.DetachedInstanceError(
   sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DagModel at 
0x7f579f911010> is not bound to a Session; lazy load operation of attribute 
'dag_owner_links' cannot proceed (Background on this error at: 
https://sqlalche.me/e/14/bhk3)
   
   
   Test Dag code
   ```
   with DAG(
       dag_id="test",
       description = 'Testing',
       start_date = datetime.datetime(2024, 8, 1, 0, 0),
       schedule=CustomTimetable(),
       catchup=False
   ) as dag:
       start = EmptyOperator(task_id="start")
       end = EmptyOperator(task_id="end")
   
   start >> end
   ```
   [
   ](url)Plugin code
   ```
   class CustomTimetable(Timetable):
       def test_variable(self):
           holiday = Variable.get("HolidaysFor2024")
           print("Test: " + holiday)
   ```
   
   it still have the same error If i use the session directly.
   ```
   class CustomTimetable(Timetable):
       def test_variable(self):
           with create_session() as session:
               holiday = session.scalar(select(Variable).where(Variable.key == 
"HolidaysFor2024").limit(1))
               print("Test: " + holiday.get_val())
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   ```
   class CustomTimetable(Timetable):
       def test_variable(self):
           holiday = Variable.get("HolidaysFor2024")
           print("Test: " + holiday)
   
       def infer_manual_data_interval(
               self,
               run_after: DateTime
       ) -> DataInterval:
           return DataInterval(start=run_after, end=run_after)
   
       def next_dagrun_info(
           self,
           *,
           last_automated_data_interval: DataInterval | None,
           restriction: TimeRestriction,
       ) -> DagRunInfo | None:
   
           self.test_variable()
   
           now = DateTime.utcnow().replace(tzinfo=UTC)
           return DagRunInfo.interval(start=now, end=now)
   
   class CustomTimetablePlugin(AirflowPlugin):
       name = "custom_timetable_plugin"
       timetables = [CustomTimetable]
   
   ```
   or
   ```
   class CustomTimetable(Timetable):
       def test_variable(self):
           with create_session() as session:
               holiday = session.scalar(select(Variable).where(Variable.key == 
"HolidaysFor2024").limit(1))
               print("Test: " + holiday.get_val())
   
      ....
   ```
   
   ### Operating System
   
   linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to