tomyedwab opened a new issue #16762:
URL: https://github.com/apache/airflow/issues/16762
**Apache Airflow version**: 2.0.1
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
**Environment**:
- **Cloud provider or hardware configuration**: Various (GCP & local Python)
- **OS** (e.g. from /etc/os-release): Various (linux, OSX)
**What happened**:
I, a certifiable idiot, accidentally passed a string into a task's
`priority_weight` parameter in production.
There was no error at DAG evaluation time. However, upon __running__ the
task, the scheduler immediately crashed. Because it was a scheduled run, the
scheduler continued to restart and immediately crash until the offending DAG
was paused.
The stack trace for my local repro is:
```
Traceback (most recent call last):
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py",
line 2447, in wsgi_app
response = self.full_dispatch_request()
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py",
line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py",
line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/_compat.py",
line 39, in reraise
raise value
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py",
line 1950, in full_dispatch_request
rv = self.dispatch_request()
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py",
line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/www/auth.py",
line 34, in decorated
return func(*args, **kwargs)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/www/decorators.py",
line 60, in wrapper
return f(*args, **kwargs)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/utils/session.py",
line 65, in wrapper
return func(*args, session=session, **kwargs)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/www/views.py",
line 1459, in trigger
dag.create_dagrun(
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/utils/session.py",
line 65, in wrapper
return func(*args, session=session, **kwargs)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/dag.py",
line 1787, in create_dagrun
run.verify_integrity(session=session)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/utils/session.py",
line 62, in wrapper
return func(*args, **kwargs)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/dagrun.py",
line 663, in verify_integrity
ti = TI(task, self.execution_date)
File "<string>", line 4, in __init__
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/sqlalchemy/orm/state.py",
line 433, in _initialize_instance
manager.dispatch.init_failure(self, args, kwargs)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py",
line 68, in __exit__
compat.raise_(
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/sqlalchemy/orm/state.py",
line 430, in _initialize_instance
return manager.original_init(*mixed[1:], **kwargs)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 286, in __init__
self.refresh_from_task(task)
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 619, in refresh_from_task
self.priority_weight = task.priority_weight_total
File
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/baseoperator.py",
line 751, in priority_weight_total
return self.priority_weight + sum(
TypeError: can only concatenate str (not "int") to str
```
**What you expected to happen**:
I would hope that simple mistakes like this wouldn't be able to take down
the Airflow scheduler. Ideally, this type of exception would cause a task to
fail and trigger the task failure logic rather than relying on monitoring
uptime for the scheduler process.
Separately, it would be nice to have a validation check in BaseOperator that
the priority_weights are integers so we get quicker feedback if we supply an
invalid value as soon as the DAG is deployed, rather than when it is supposed
to run.
**How to reproduce it**:
I can reproduce this easily by adding a bad `priority_weight` parameter to
any task, i.e.:
```
PythonOperator(task_id='hello', python_callable=_print_hello,
priority_weight="X")
```
How often does this problem occur? Once? Every time etc?
This problem occurs every time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]