jscheffl opened a new pull request, #41082:
URL: https://github.com/apache/airflow/pull/41082
Related: #41067
Attempted to fix `tests/core/test_core.py` but did not make it.
Nevertheless 50% of progress is within this PR: The dag_maker needs to
commit changes such that tests fetching data from internal API can find it
(ACID in DB needs to commit, else a different connection does not see INSERTs).
Also the internal API loading task instance details failed in loading
details from DAG, switched to DagBag.
Afterwards, still the test fails in spaghetti with the following stack - I
am not sure how to fix this, I fear there is a new cut as internal API needed
but I am unsure on which function. Either I need a hint or somebody needs to
make this in a follow-up PR:
```
____________________________ TestCore.test_timeout
_____________________________
self = <tests.core.test_core.TestCore object at 0x7568aaf613d0>
dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at
0x7568686dfe80>
def test_timeout(self, dag_maker):
def sleep_and_catch_other_exceptions():
try:
sleep(5)
# Catching Exception should NOT catch AirflowTaskTimeout
except Exception:
pass
with dag_maker(serialized=True):
op = PythonOperator(
task_id="test_timeout",
execution_timeout=timedelta(seconds=1),
python_callable=sleep_and_catch_other_exceptions,
)
dag_maker.create_dagrun()
with pytest.raises(AirflowTaskTimeout):
> op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
tests/core/test_core.py:89:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
airflow/utils/session.py:97: in wrapper
return func(*args, session=session, **kwargs)
airflow/models/baseoperator.py:1519: in run
ti.run(
airflow/utils/session.py:94: in wrapper
return func(*args, **kwargs)
airflow/models/taskinstance.py:3192: in run
self._run_raw_task(
airflow/utils/session.py:94: in wrapper
return func(*args, **kwargs)
airflow/models/taskinstance.py:2959: in _run_raw_task
return _run_raw_task(
airflow/models/taskinstance.py:244: in _run_raw_task
ti.refresh_from_db(session=session)
airflow/utils/session.py:94: in wrapper
return func(*args, **kwargs)
airflow/models/taskinstance.py:2302: in refresh_from_db
_refresh_from_db(task_instance=self, session=session,
lock_for_update=lock_for_update)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
def _refresh_from_db(
*,
task_instance: TaskInstance | TaskInstancePydantic,
session: Session | None = None,
lock_for_update: bool = False,
) -> None:
"""
Refresh the task instance from the database based on the primary key.
:param task_instance: the task instance
:param session: SQLAlchemy ORM Session
:param lock_for_update: if True, indicates that the database should
lock the TaskInstance (issuing a FOR UPDATE clause) until the
session is committed.
:meta private:
"""
> if session and task_instance in session:
E TypeError: argument of type 'TracebackSessionForTests' is not
iterable
airflow/models/taskinstance.py:848: TypeError
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]