snjypl opened a new issue, #28391:
URL: https://github.com/apache/airflow/issues/28391
### Apache Airflow version
main (development)
### What happened
Manual dag trigger fails for kubernetes executor with the following error:
```
[2022-12-15 20:05:38,442] {app.py:1741} ERROR - Exception on /run [POST]
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1900, in _execute_context
self.dialect.do_execute(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for integer:
"manual"
LINE 3: ...ate = 'queued' AND task_instance.queued_by_job_id = 'manual'
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py",
line 2525, in wsgi_app
response = self.full_dispatch_request()
File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py",
line 1822, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py",
line 1820, in full_dispatch_request
rv = self.dispatch_request()
File "/home/airflow/.local/lib/python3.10/site-packages/flask/app.py",
line 1796, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/www/auth.py", line
47, in decorated
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/www/decorators.py",
line 125, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 75, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/www/views.py", line
1896, in run
executor.start()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/kubernetes_executor.py",
line 586, in start
self.clear_not_launched_queued_tasks()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 75, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/kubernetes_executor.py",
line 510, in clear_not_launched_queued_tasks
queued_tis: list[TaskInstance] = query.all()
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
line 2773, in all
return self._iter().all()
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
line 2916, in _iter
result = self.session.execute(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 1714, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py",
line 334, in _execute_on_connection
return connection._execute_clauseelement(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1572, in _execute_clauseelement
ret = self._execute_context(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1943, in _execute_context
self._handle_dbapi_exception(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 2124, in _handle_dbapi_exception
util.raise_(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py",
line 1900, in _execute_context
self.dialect.do_execute(
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation)
invalid input syntax for integer: "manual"
LINE 3: ...ate = 'queued' AND task_instance.queued_by_job_id = 'manual'
```
^
### What you think should happen instead
should be able to trigger the dag manually.
### How to reproduce
deploy the main branch with kubernetes executor and postgres db.
### Operating System
ubuntu 20
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
Python version: 3.10.9
Airflow version: 2.6.0.dev0
### Anything else
the issue is caused due to this check:
https://github.com/apache/airflow/blob/b263dbcb0f84fd9029591d1447a7c843cb970f15/airflow/executors/kubernetes_executor.py#L505-L507
in `celery_executor` there is a similar check, but i believe it is not
called at the ti executor time. and also since it is in a try/catch the
exception is not visible.
https://github.com/apache/airflow/blob/b263dbcb0f84fd9029591d1447a7c843cb970f15/airflow/executors/celery_executor.py#L394-L412
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]