Shubhada Anand created AIRFLOW-6845:
---------------------------------------
Summary: Airflow scheduler throws recursion error
Key: AIRFLOW-6845
URL: https://issues.apache.org/jira/browse/AIRFLOW-6845
Project: Apache Airflow
Issue Type: Bug
Components: DAG, scheduler
Affects Versions: 1.10.7
Environment: OSX MySQL Reddis
Reporter: Shubhada Anand
In Apache-AirFlow, I have written custom BaseOperator and executing that in a
DAG, following is the code,
Operator Code :
from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators
import apply_defaults
class TestOperator(BaseOperator): template_fields = ('param1') ui_color =
'#A7E6A7'
{{@apply_defaults
def __init__(self,param1,*args, **kwargs):
self.param1 = param1
super(TestOperator, self).__init__(*args, **kwargs)
def execute(self):
print ('welcome to airflow')}}
class TestOperatorPlugin(AirflowPlugin): name = "TestOperator_plugin" operators
= [TestOperator]
DAG Code :
from TestOperator import TestOperator from airflow import DAG from datetime
import datetime
prog_args = \{ 'depends_on_past': False, 'param1' : 'testOne' }
testMYDAG = DAG('TestMYDAG', start_date = datetime(2020, 2, 18) ,
description='TestMYDAG', default_args = prog_args, schedule_interval=None)
testOp = TestOperator(task_id='test_dag', dag=testMYDAG )
testOp
----
Once I start AirFlow webserver and scheduler, I am triggering DAG "TestMYDAG"
manually. I get the following error and after this error console I don't get
back till I delete all backend table entries for job, tasks in MySQL manually.
Here is the error on Console ,
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid
start byte
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File
"/usr/local/lib/python3.7/site-packages/flask/app.py", line 2446, in wsgi_app
response = self.full_dispatch_request() File
"/usr/local/lib/python3.7/site-packages/flask/app.py", line 1951, in
full_dispatch_request rv = self.handle_user_exception(e) File
"/usr/local/lib/python3.7/site-packages/flask/app.py", line 1820, in
handle_user_exception reraise(exc_type, exc_value, tb) File
"/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
raise value File "/usr/local/lib/python3.7/site-packages/flask/app.py", line
1949, in full_dispatch_request rv = self.dispatch_request() File
"/usr/local/lib/python3.7/site-packages/flask/app.py", line 1935, in
dispatch_request return self.view_functionsrule.endpoint File
"/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 368, in
_run_view return fn(self, *args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/flask_login/utils.py", line 258, in
decorated_view return func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in
wrapper return func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/airflow/www/views.py", line 2280, in
index auto_complete_data=auto_complete_data) File
"/usr/local/lib/python3.7/site-packages/airflow/www/views.py", line 388, in
render return super(AirflowViewMixin, self).render(template, **kwargs) File
"/usr/local/lib/python3.7/site-packages/flask_admin/base.py", line 308, in
render return render_template(template, **kwargs) File
"/usr/local/lib/python3.7/site-packages/flask/templating.py", line 140, in
render_template ctx.app, File
"/usr/local/lib/python3.7/site-packages/flask/templating.py", line 120, in
_render rv = template.render(context) File
"/usr/local/lib/python3.7/site-packages/jinja2/asyncsupport.py", line 76, in
render return original_render(self, *args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 1008, in
render return self.environment.handle_exception(exc_info, True) File
"/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 780, in
handle_exception reraise(exc_type, exc_value, tb) File
"/usr/local/lib/python3.7/site-packages/jinja2/_compat.py", line 37, in reraise
raise value.with_traceback(tb) File
"/usr/local/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html",
line 20, in top-level template code \{% extends "airflow/master.html" %} File
"/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 1005, in
render return concat(self.root_render_func(self.new_context(vars))) File
"/usr/local/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html",
line 16, in root specific language governing permissions and limitations File
"/usr/local/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html",
line 16, in root specific language governing permissions and limitations File
"/usr/local/lib/python3.7/site-packages/airflow/www/templates/admin/master.html",
line 16, in root specific language governing permissions and limitations File
"/usr/local/lib/python3.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html",
line 26, in root File
"/usr/local/lib/python3.7/site-packages/airflow/www/templates/admin/master.html",
line 90, in block_page_body function convertSecsToHumanReadable(seconds) {
File
"/usr/local/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html",
line 79, in block_body File
"/usr/local/lib/python3.7/site-packages/jinja2/runtime.py", line 262, in call
return *obj(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in
wrapper return func(*args, **kwargs) File
"/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1607, in
get_last_dagrun include_externally_triggered=include_externally_triggered) File
"/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 81, in
get_last_dagrun return query.first() File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3287, in
first ret = list(self[0:1]) File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3065, in
__getitem* return list(res) File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3389, in
*iter* return self._execute_and_instances(context) File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3414, in
_execute_and_instances result = conn.execute(querycontext.statement,
self._params) File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982,
in execute return meth(self, multiparams, params) File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293,
in _execute_on_connection return connection._execute_clauseelement(self,
multiparams, params) File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101,
in _execute_clauseelement distilled_params, File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250,
in _execute_context e, statement, parameters, cursor, context File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1478,
in _handle_dbapi_exception util.reraise(*exc_info) File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153,
in reraise raise value File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1246,
in _execute_context cursor, statement, parameters, context File
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line
588, in do_execute cursor.execute(statement, parameters) File
"/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line
272, in execute self._handle_result(result) File
"/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line
163, in _handle_result self._handle_resultset() File
"/usr/local/lib/python3.7/site-packages/mysql/connector/cursor_cext.py", line
651, in _handle_resultset self._rows = self._cnx.get_rows()[0] File
"/usr/local/lib/python3.7/site-packages/mysql/connector/connection_cext.py",
line 318, in get_rows else self._cmysql.fetch_row() SystemError: returned a
result with an error set
----
if scheduler is still running and not stopped after some time I am getting
following error.
..... File
"/usr/local/lib/python3.7/site-packages/airflow/utils/log/logging_mixin.py",
line 112, in _propagate_log self.logger.log(self.level,
remove_escape_codes(message)) File
"/usr/local/Cellar/python/3.7.6_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/logging/*init*.py",
line 1444, in log self._log(level, msg, args, **kwargs) File
"/usr/local/Cellar/python/3.7.6_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/logging/*init*.py",
line 1514, in _log self.handle(record) File
"/usr/local/Cellar/python/3.7.6_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/logging/*init*.py",
line 1524, in handle self.callHandlers(record) File
"/usr/local/Cellar/python/3.7.6_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/logging/*init*.py",
line 1586, in callHandlers hdlr.handle(record) File
"/usr/local/Cellar/python/3.7.6_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/logging/*init*.py",
line 894, in handle self.emit(record) File
"/usr/local/lib/python3.7/site-packages/airflow/utils/log/file_processor_handler.py",
line 76, in emit self.handler.emit(record) File
"/usr/local/Cellar/python/3.7.6_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/logging/*init*.py",
line 1127, in emit StreamHandler.emit(self, record) File
"/usr/local/Cellar/python/3.7.6_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/logging/*init*.py",
line 1025, in emit msg = self.format(record) File
"/usr/local/Cellar/python/3.7.6_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/logging/*init*.py",
line 869, in format return fmt.format(record) RecursionError: maximum
recursion depth exceeded
Please let me know if any questions, need more clarity.
Thanks and Regards, Shubhada
--
This message was sent by Atlassian Jira
(v8.3.4#803005)