[ 
https://issues.apache.org/jira/browse/AIRFLOW-351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16820226#comment-16820226
 ] 

Oleg Khavronin commented on AIRFLOW-351:
----------------------------------------

We are getting the same error in the same place (adjusted for Airflow version) 
as 
https://issues.apache.org/jira/browse/AIRFLOW-351?focusedCommentId=16330590&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16330590

Currently, we are using 1.10.2. DAG default arguments only contain owner and 
email.  We use the following function to clear sub-dag on re-try :

[https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117]

 

Exception thrown in  {{models.py, line 1833}}
{code:java}
[2019-04-10 19:01:27,037] {logging_mixin.py:95} INFO - [2019-04-10 
19:01:27,037] {utils.py:97} INFO - Clearing SubDag: 
b_dags_daily_tis_dag.daily_stats_unfiltered 2019-04-03T08:00:00+00:00
[2019-04-10 19:01:27,042] {models.py:1833} ERROR - Failed at executing callback
[2019-04-10 19:01:27,053] {models.py:1834} ERROR - cannot serialize 
'_io.TextIOWrapper' object
Traceback (most recent call last):
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py", 
line 1657, in _run_raw_task
    result = task_copy.execute(context=context)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/operators/subdag_operator.py",
 line 103, in execute
    executor=self.executor)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py", 
line 4324, in run
    job.run()
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/jobs.py", 
line 202, in run
    self._execute()
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/utils/db.py", 
line 73, in wrapper
    return func(*args, **kwargs)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/jobs.py", 
line 2440, in _execute
    session=session)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/utils/db.py", 
line 69, in wrapper
    return func(*args, **kwargs)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/jobs.py", 
line 2394, in _execute_for_run_dates
    session=session)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/utils/db.py", 
line 69, in wrapper
    return func(*args, **kwargs)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/jobs.py", 
line 2284, in _process_backfill_task_instances
    executor.heartbeat()
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/executors/base_executor.py",
 line 150, in heartbeat
    self.sync()
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/executors/sequential_executor.py",
 line 48, in sync
    subprocess.check_call(command, shell=True, close_fds=True)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/subprocess.py", line 266, in 
check_call
    retcode = call(*popenargs, **kwargs)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/subprocess.py", line 249, in 
call
    return p.wait(timeout=timeout)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/subprocess.py", line 1389, 
in wait
    (pid, sts) = self._try_wait(0)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/subprocess.py", line 1339, 
in _try_wait
    (pid, sts) = os.waitpid(self.pid, wait_flags)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py", 
line 1636, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py", 
line 1829, in handle_failure
    task.on_retry_callback(context)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/bombora_airflow/utils.py",
 line 104, in callback_subdag_clear
    include_subdags=False)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/utils/db.py", 
line 73, in wrapper
    return func(*args, **kwargs)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py", 
line 3987, in clear
    include_downstream=True)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py", 
line 4131, in sub_dag
    dag = copy.deepcopy(self)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 166, in 
deepcopy
    y = copier(memo)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py", 
line 4116, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in 
deepcopy
    y = copier(x, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 166, in 
deepcopy
    y = copier(memo)
  File 
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py", 
line 2848, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in 
deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in 
_reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in 
deepcopy
    y = copier(x, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in 
deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in 
_reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in 
deepcopy
    y = copier(x, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in 
deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in 
_reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in 
deepcopy
    y = copier(x, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in 
deepcopy
    y = copier(x, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in 
deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in 
_reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in 
deepcopy
    y = copier(x, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in 
deepcopy
    y = copier(x, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 218, in 
_deepcopy_list
    y.append(deepcopy(a, memo))
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in 
deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in 
_reconstruct
    state = deepcopy(state, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in 
deepcopy
    y = copier(x, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 174, in 
deepcopy
    rv = reductor(4)
TypeError: cannot serialize '_io.TextIOWrapper' object

{code}

> Failed to clear downstream tasks
> --------------------------------
>
>                 Key: AIRFLOW-351
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-351
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: models, subdag, webserver
>    Affects Versions: 1.7.1.3
>            Reporter: Adinata
>            Priority: Major
>         Attachments: dag_error.py, error.log, error_on_clear_dag.txt, 
> ubuntu-14-packages.log, ubuntu-16-oops.log, ubuntu-16-packages.log
>
>
> {code}
>                           ____/ (  (    )   )  \___
>                          /( (  (  )   _    ))  )   )\
>                        ((     (   )(    )  )   (   )  )
>                      ((/  ( _(   )   (   _) ) (  () )  )
>                     ( (  ( (_)   ((    (   )  .((_ ) .  )_
>                    ( (  )    (      (  )    )   ) . ) (   )
>                   (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
>                   ( (  (   ) (  )   (  ))     ) _)(   )  )  )
>                  ( (  ( \ ) (    (_  ( ) ( )  )   ) )  )) ( )
>                   (  (   (  (   (_ ( ) ( _    )  ) (  )  )   )
>                  ( (  ( (  (  )     (_  )  ) )  _)   ) _( ( )
>                   ((  (   )(    (     _    )   _) _(_ (  (_ )
>                    (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
>                    ((__)        \\||lll|l||///          \_))
>                             (   /(/ (  )  ) )\   )
>                           (    ( ( ( | | ) ) )\   )
>                            (   /(| / ( )) ) ) )) )
>                          (     ( ((((_(|)_)))))     )
>                           (      ||\(|(|)|/||     )
>                         (        |(||(||)||||        )
>                           (     //|/l|||)|\\ \     )
>                         (/ / //  /|//||||\\  \ \  \ _)
> -------------------------------------------------------------------------------
> Node: 9889a7c79e9b
> -------------------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1817, in 
> wsgi_app
>     response = self.full_dispatch_request()
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1477, in 
> full_dispatch_request
>     rv = self.handle_user_exception(e)
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1381, in 
> handle_user_exception
>     reraise(exc_type, exc_value, tb)
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1475, in 
> full_dispatch_request
>     rv = self.dispatch_request()
>   File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1461, in 
> dispatch_request
>     return self.view_functions[rule.endpoint](**req.view_args)
>   File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 68, 
> in inner
>     return self._run_view(f, *args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 
> 367, in _run_view
>     return fn(self, *args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 755, in 
> decorated_view
>     return func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 
> 118, in wrapper
>     return f(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 
> 167, in wrapper
>     return f(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 
> 1017, in clear
>     include_upstream=upstream)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2870, 
> in sub_dag
>     dag = copy.deepcopy(self)
>   File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
>     y = copier(memo)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2856, 
> in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
>     y = copier(memo)
>   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1974, 
> in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 230, in _deepcopy_list
>     y.append(deepcopy(a, memo))
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
>     y = copier(x, memo)
>   File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/usr/lib/python2.7/copy.py", line 329, in _reconstruct
>     y = callable(*args)
>   File "/usr/lib/python2.7/copy_reg.py", line 93, in __newobj__
>     return cls.__new__(cls, *args)
> TypeError: object.__new__(thread.lock) is not safe, use thread.lock.__new__()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to