dKosarevsky commented on PR #23338:
URL: https://github.com/apache/airflow/pull/23338#issuecomment-1207351953
Hi, may be this example can help test:
```py
@provide_session
def run_queued_tasks(session):
filter_ = [
or_(TaskInstance.state == State.QUEUED, TaskInstance.state ==
State.NONE),
TaskInstance.queued_dttm < datetime.now(timezone.utc) -
timedelta(minutes=20)
]
tis = session.query(TaskInstance).filter(*filter_).all()
if not len(tis):
raise AirflowSkipException("Skipped. Empty task instances list.")
logger.info(f"Updating {len(tis)} task instances:")
for ti in tis:
logger.info(dict(
dag_id=ti.dag_id,
task_id=ti.task_id,
state=ti.state,
execution_date=ti.execution_date,
))
ti.run(session=session)
session.commit()
logger.info("Done.")
```
this code return:
```py
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py",
line 171, in execute
return_value = self.execute_callable()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py",
line 189, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 71, in wrapper
return func(*args, session=session, **kwargs)
File
"/opt/airflow/dags/repo/dags/pik_digital/pik_dags/_queued_tasks_runner/__init__.py",
line 39, in run_queued_tasks
ti.dry_run()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
line 1820, in dry_run
self.task = self.task.prepare_for_execution()
AttributeError: 'TaskInstance' object has no attribute 'task'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]