[ 
https://issues.apache.org/jira/browse/AIRFLOW-1264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813666#comment-16813666
 ] 

Andrew Stahlman edited comment on AIRFLOW-1264 at 5/21/19 12:11 AM:
--------------------------------------------------------------------

I'm able to consistently reproduce this by creating a DAG with a PythonOperator 
whose {{python_callable}} is a bound instance method.

Here's a DAG that will always trigger the exception:
{code:java}
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from copy import deepcopy

default_args = {
    'start_date': datetime(2019, 3, 11),
    'owner': 'astahlman',
}


class SomeClass():

    def some_instance_method(self, **kwargs):
        return kwargs

dag = DAG(
        'copy_issue_repro',
        schedule_interval=timedelta(days=1),
        default_args=default_args
)

op = PythonOperator(
    task_id='hi',
    provide_context=True,
    python_callable=SomeClass().some_instance_method,
    dag=dag,
)

deepcopy(dag)  # trigger the exception during DAG evaluation
{code}
Here's an even smaller example that triggers the same exception:
{code:java}
from copy import copy

class SomeClass():

    def some_instance_method(self, **kwargs):
        return kwargs

copy(SomeClass().some_instance_method)
{code}

The code that deepcopies the DAG is here, in 
[DAG.sub_dag|https://github.com/apache/airflow/blob/e1c1a8dad0e01a7baa50ebe02e748429d33241fd/airflow/models/dag.py#L1001]

 

 

 


was (Author: tronbabylove):
I'm able to consistently reproduce this by creating a DAG with a PythonOperator 
whose {{python_callable}} is a bound instance method.

Here's a DAG that will always trigger the exception:
{code:java}
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from copy import deepcopy

default_args = {
    'start_date': datetime(2019, 3, 11),
    'owner': 'astahlman',
}


class SomeClass():

    def some_instance_method(self, **kwargs):
        return kwargs

dag = DAG(
        'copy_issue_repro',
        schedule_interval=timedelta(days=1),
        default_args=default_args
)

op = PythonOperator(
    task_id='hi',
    provide_context=True,
    python_callable=SomeClass().some_instance_method,
    dag=dag,
)

deepcopy(dag)  # trigger the exception during DAG evaluation
{code}
Here's an even smaller example that triggers the same exception:
{code:java}
from copy import copy

class SomeClass():

    def some_instance_method(self, **kwargs):
        return kwargs

copy(SomeClass().some_instance_method)
{code}

The code that deepcopies the DAG is here, in 
[DAG.sub_dag|https://github.com/apache/airflow/blob/master/airflow/models/dag.py#L997]

 

 

 

> Could not clear task
> --------------------
>
>                 Key: AIRFLOW-1264
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1264
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Rui Wang
>            Priority: Major
>
> After cleared a task of a DAG, saw following error returned by webserver, the 
> error could be reproduced on specific dag_run.
> One guess is DAG related file change caused this issue. Not sure if this 
> issue can be reproduced again.
> {panel}
> Traceback (most recent call last):
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1817, in 
> wsgi_app
> response = self.full_dispatch_request()
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1477, in 
> full_dispatch_request
> rv = self.handle_user_exception(e)
> File 
> "/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", 
> line 103, in nr_wrapper_Flask_handle_exception
> return wrapped(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1381, in 
> handle_user_exception
> reraise(exc_type, exc_value, tb)
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1475, in 
> full_dispatch_request
> rv = self.dispatch_request()
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1461, in 
> dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
> File 
> "/usr/local/lib/python2.7/dist-packages/newrelic/hooks/framework_flask.py", 
> line 40, in nr_wrapper_handler
> return wrapped(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 68, 
> in inner
> return self._run_view(f, *args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 367, 
> in _run_view
> return fn(self, *args, **kwargs)
> File 
> "/usr/local/lib/python2.7/dist-packages/Flask_Login-0.2.11-py2.7.egg/flask_login.py",
>  line 758, in decorated_view
> return func(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 125, 
> in wrapper
> return f(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 174, 
> in wrapper
> return f(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 
> 1038, in clear
> include_upstream=upstream)
> File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 3123, 
> in sub_dag
> dag = copy.deepcopy(self)
> File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
> y = copier(memo)
> File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 3109, 
> in _deepcopy_
> setattr(result, k, copy.deepcopy(v, memo))
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
> y = copier(memo)
> File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2204, 
> in _deepcopy_
> setattr(result, k, copy.deepcopy(v, memo))
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 329, in _reconstruct
> y = callable(*args)
> File "/usr/lib/python2.7/copy_reg.py", line 93, in _newobj_
> return cls._new_(cls, *args)
> TypeError: _new_() takes at least 2 arguments (1 given)
> {panel}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to