tomyedwab opened a new issue #16762:
URL: https://github.com/apache/airflow/issues/16762


   **Apache Airflow version**: 2.0.1
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Various (GCP & local Python)
   - **OS** (e.g. from /etc/os-release): Various (linux, OSX)
   
   **What happened**:
   
   I, a certifiable idiot, accidentally passed a string into a task's 
`priority_weight` parameter in production.
   
   There was no error at DAG evaluation time. However, upon __running__ the 
task, the scheduler immediately crashed. Because it was a scheduled run, the 
scheduler continued to restart and immediately crash until the offending DAG 
was paused.
   
   The stack trace for my local repro is:
   ```
   Traceback (most recent call last):
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py", 
line 2447, in wsgi_app
       response = self.full_dispatch_request()
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py", 
line 1952, in full_dispatch_request
       rv = self.handle_user_exception(e)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py", 
line 1821, in handle_user_exception
       reraise(exc_type, exc_value, tb)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/_compat.py", 
line 39, in reraise
       raise value
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py", 
line 1950, in full_dispatch_request
       rv = self.dispatch_request()
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/flask/app.py", 
line 1936, in dispatch_request
       return self.view_functions[rule.endpoint](**req.view_args)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/www/auth.py",
 line 34, in decorated
       return func(*args, **kwargs)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/www/decorators.py",
 line 60, in wrapper
       return f(*args, **kwargs)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/utils/session.py",
 line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/www/views.py",
 line 1459, in trigger
       dag.create_dagrun(
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/utils/session.py",
 line 65, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/dag.py",
 line 1787, in create_dagrun
       run.verify_integrity(session=session)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/utils/session.py",
 line 62, in wrapper
       return func(*args, **kwargs)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/dagrun.py",
 line 663, in verify_integrity
       ti = TI(task, self.execution_date)
     File "<string>", line 4, in __init__
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/sqlalchemy/orm/state.py",
 line 433, in _initialize_instance
       manager.dispatch.init_failure(self, args, kwargs)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py",
 line 68, in __exit__
       compat.raise_(
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/sqlalchemy/util/compat.py",
 line 182, in raise_
       raise exception
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/sqlalchemy/orm/state.py",
 line 430, in _initialize_instance
       return manager.original_init(*mixed[1:], **kwargs)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 286, in __init__
       self.refresh_from_task(task)
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 619, in refresh_from_task
       self.priority_weight = task.priority_weight_total
     File 
"/Users/tomyedwab/.venv/khanflow/lib/python3.8/site-packages/airflow/models/baseoperator.py",
 line 751, in priority_weight_total
       return self.priority_weight + sum(
   TypeError: can only concatenate str (not "int") to str
   ```
   
   **What you expected to happen**:
   
   I would hope that simple mistakes like this wouldn't be able to take down 
the Airflow scheduler. Ideally, this type of exception would cause a task to 
fail and trigger the task failure logic rather than relying on monitoring 
uptime for the scheduler process.
   
   Separately, it would be nice to have a validation check in BaseOperator that 
the priority_weights are integers so we get quicker feedback if we supply an 
invalid value as soon as the DAG is deployed, rather than when it is supposed 
to run.
   
   **How to reproduce it**:
   
   I can reproduce this easily by adding a bad `priority_weight` parameter to 
any task, i.e.:
   
   ```
   PythonOperator(task_id='hello', python_callable=_print_hello, 
priority_weight="X")
   ```
   
   How often does this problem occur? Once? Every time etc?
   
   This problem occurs every time.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to