yi-cheng-chen-taiwan opened a new issue, #41523:
URL: https://github.com/apache/airflow/issues/41523
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.9.0
### What happened?
I want to build a custom timetable plugin that can get holidays from meta
database variables.
The data can fetch from variables.
But there is an exception that throw from dag.py and the dag register
failure.
> Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py",
line 182, in _run_file_processor
_handle_dag_file_processing()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py",
line 163, in _handle_dag_file_processing
result: tuple[int, int] = dag_file_processor.process_file(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 79, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py",
line 859, in process_file
serialize_errors = DagFileProcessor.save_dag_to_db(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/api_internal/internal_api_call.py",
line 115, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 79, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/processor.py",
line 895, in save_dag_to_db
import_errors = DagBag._sync_to_db(dags=dags,
processor_subdir=dag_directory, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dagbag.py",
line 659, in _sync_to_db
for attempt in run_with_db_retries(logger=log):
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
347, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
314, in iter
return fut.result()
^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in
result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in
__get_result
raise self._exception
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dagbag.py",
line 675, in _sync_to_db
DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir,
session=session)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dag.py", line
3191, in bulk_write_to_db
orm_dag_links = orm_dag.dag_owner_links or []
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py",
line 487, in __get__
return self.impl.get(state, dict_)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py",
line 959, in get
value = self._fire_loader_callables(state, key, passive)
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py",
line 995, in _fire_loader_callables
return self.callable_(state, passive)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/strategies.py",
line 863, in _load_for_state
raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DagModel at
0x7f579f911010> is not bound to a Session; lazy load operation of attribute
'dag_owner_links' cannot proceed (Background on this error at:
https://sqlalche.me/e/14/bhk3)
Test Dag code
```
with DAG(
dag_id="test",
description = 'Testing',
start_date = datetime.datetime(2024, 8, 1, 0, 0),
schedule=CustomTimetable(),
catchup=False
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> end
```
[
](url)Plugin code
```
class CustomTimetable(Timetable):
def test_variable(self):
holiday = Variable.get("HolidaysFor2024")
print("Test: " + holiday)
```
it still have the same error If i use the session directly.
```
class CustomTimetable(Timetable):
def test_variable(self):
with create_session() as session:
holiday = session.scalar(select(Variable).where(Variable.key ==
"HolidaysFor2024").limit(1))
print("Test: " + holiday.get_val())
```
### What you think should happen instead?
_No response_
### How to reproduce
```
class CustomTimetable(Timetable):
def test_variable(self):
holiday = Variable.get("HolidaysFor2024")
print("Test: " + holiday)
def infer_manual_data_interval(
self,
run_after: DateTime
) -> DataInterval:
return DataInterval(start=run_after, end=run_after)
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
self.test_variable()
now = DateTime.utcnow().replace(tzinfo=UTC)
return DagRunInfo.interval(start=now, end=now)
class CustomTimetablePlugin(AirflowPlugin):
name = "custom_timetable_plugin"
timetables = [CustomTimetable]
```
or
```
class CustomTimetable(Timetable):
def test_variable(self):
with create_session() as session:
holiday = session.scalar(select(Variable).where(Variable.key ==
"HolidaysFor2024").limit(1))
print("Test: " + holiday.get_val())
....
```
### Operating System
linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other Docker-based deployment
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]