[
https://issues.apache.org/jira/browse/AIRFLOW-351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16820226#comment-16820226
]
Oleg Khavronin commented on AIRFLOW-351:
----------------------------------------
We are getting the same error in the same place (adjusted for Airflow version)
as
https://issues.apache.org/jira/browse/AIRFLOW-351?focusedCommentId=16330590&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16330590
Currently, we are using 1.10.2. DAG default arguments only contain owner and
email. We use the following function to clear sub-dag on re-try :
[https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117]
Exception thrown in {{models.py, line 1833}}
{code:java}
[2019-04-10 19:01:27,037] {logging_mixin.py:95} INFO - [2019-04-10
19:01:27,037] {utils.py:97} INFO - Clearing SubDag:
b_dags_daily_tis_dag.daily_stats_unfiltered 2019-04-03T08:00:00+00:00
[2019-04-10 19:01:27,042] {models.py:1833} ERROR - Failed at executing callback
[2019-04-10 19:01:27,053] {models.py:1834} ERROR - cannot serialize
'_io.TextIOWrapper' object
Traceback (most recent call last):
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py",
line 1657, in _run_raw_task
result = task_copy.execute(context=context)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/operators/subdag_operator.py",
line 103, in execute
executor=self.executor)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py",
line 4324, in run
job.run()
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/jobs.py",
line 202, in run
self._execute()
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/utils/db.py",
line 73, in wrapper
return func(*args, **kwargs)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/jobs.py",
line 2440, in _execute
session=session)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/utils/db.py",
line 69, in wrapper
return func(*args, **kwargs)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/jobs.py",
line 2394, in _execute_for_run_dates
session=session)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/utils/db.py",
line 69, in wrapper
return func(*args, **kwargs)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/jobs.py",
line 2284, in _process_backfill_task_instances
executor.heartbeat()
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/executors/base_executor.py",
line 150, in heartbeat
self.sync()
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/executors/sequential_executor.py",
line 48, in sync
subprocess.check_call(command, shell=True, close_fds=True)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/subprocess.py", line 266, in
check_call
retcode = call(*popenargs, **kwargs)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/subprocess.py", line 249, in
call
return p.wait(timeout=timeout)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/subprocess.py", line 1389,
in wait
(pid, sts) = self._try_wait(0)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/subprocess.py", line 1339,
in _try_wait
(pid, sts) = os.waitpid(self.pid, wait_flags)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py",
line 1636, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py",
line 1829, in handle_failure
task.on_retry_callback(context)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/bombora_airflow/utils.py",
line 104, in callback_subdag_clear
include_subdags=False)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/utils/db.py",
line 73, in wrapper
return func(*args, **kwargs)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py",
line 3987, in clear
include_downstream=True)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py",
line 4131, in sub_dag
dag = copy.deepcopy(self)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 166, in
deepcopy
y = copier(memo)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py",
line 4116, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in
deepcopy
y = copier(x, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in
_deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 166, in
deepcopy
y = copier(memo)
File
"/opt/conda/envs/bairflow-gke/lib/python3.5/site-packages/airflow/models.py",
line 2848, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in
deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in
_reconstruct
state = deepcopy(state, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in
deepcopy
y = copier(x, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in
_deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in
deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in
_reconstruct
state = deepcopy(state, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in
deepcopy
y = copier(x, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in
_deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in
deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in
_reconstruct
state = deepcopy(state, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in
deepcopy
y = copier(x, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in
_deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in
deepcopy
y = copier(x, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in
_deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in
deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in
_reconstruct
state = deepcopy(state, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in
deepcopy
y = copier(x, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in
_deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in
deepcopy
y = copier(x, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 218, in
_deepcopy_list
y.append(deepcopy(a, memo))
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 182, in
deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 297, in
_reconstruct
state = deepcopy(state, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 155, in
deepcopy
y = copier(x, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 243, in
_deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/opt/conda/envs/bairflow-gke/lib/python3.5/copy.py", line 174, in
deepcopy
rv = reductor(4)
TypeError: cannot serialize '_io.TextIOWrapper' object
{code}
> Failed to clear downstream tasks
> --------------------------------
>
> Key: AIRFLOW-351
> URL: https://issues.apache.org/jira/browse/AIRFLOW-351
> Project: Apache Airflow
> Issue Type: Bug
> Components: models, subdag, webserver
> Affects Versions: 1.7.1.3
> Reporter: Adinata
> Priority: Major
> Attachments: dag_error.py, error.log, error_on_clear_dag.txt,
> ubuntu-14-packages.log, ubuntu-16-oops.log, ubuntu-16-packages.log
>
>
> {code}
> ____/ ( ( ) ) \___
> /( ( ( ) _ )) ) )\
> (( ( )( ) ) ( ) )
> ((/ ( _( ) ( _) ) ( () ) )
> ( ( ( (_) (( ( ) .((_ ) . )_
> ( ( ) ( ( ) ) ) . ) ( )
> ( ( ( ( ) ( _ ( _) ). ) . ) ) ( )
> ( ( ( ) ( ) ( )) ) _)( ) ) )
> ( ( ( \ ) ( (_ ( ) ( ) ) ) ) )) ( )
> ( ( ( ( (_ ( ) ( _ ) ) ( ) ) )
> ( ( ( ( ( ) (_ ) ) ) _) ) _( ( )
> (( ( )( ( _ ) _) _(_ ( (_ )
> (_((__(_(__(( ( ( | ) ) ) )_))__))_)___)
> ((__) \\||lll|l||/// \_))
> ( /(/ ( ) ) )\ )
> ( ( ( ( | | ) ) )\ )
> ( /(| / ( )) ) ) )) )
> ( ( ((((_(|)_))))) )
> ( ||\(|(|)|/|| )
> ( |(||(||)|||| )
> ( //|/l|||)|\\ \ )
> (/ / // /|//||||\\ \ \ \ _)
> -------------------------------------------------------------------------------
> Node: 9889a7c79e9b
> -------------------------------------------------------------------------------
> Traceback (most recent call last):
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1817, in
> wsgi_app
> response = self.full_dispatch_request()
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1477, in
> full_dispatch_request
> rv = self.handle_user_exception(e)
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1381, in
> handle_user_exception
> reraise(exc_type, exc_value, tb)
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1475, in
> full_dispatch_request
> rv = self.dispatch_request()
> File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1461, in
> dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
> File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 68,
> in inner
> return self._run_view(f, *args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line
> 367, in _run_view
> return fn(self, *args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 755, in
> decorated_view
> return func(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line
> 118, in wrapper
> return f(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line
> 167, in wrapper
> return f(*args, **kwargs)
> File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line
> 1017, in clear
> include_upstream=upstream)
> File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2870,
> in sub_dag
> dag = copy.deepcopy(self)
> File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
> y = copier(memo)
> File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2856,
> in __deepcopy__
> setattr(result, k, copy.deepcopy(v, memo))
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 174, in deepcopy
> y = copier(memo)
> File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1974,
> in __deepcopy__
> setattr(result, k, copy.deepcopy(v, memo))
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 230, in _deepcopy_list
> y.append(deepcopy(a, memo))
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
> state = deepcopy(state, memo)
> File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
> y = copier(x, memo)
> File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
> y = _reconstruct(x, rv, 1, memo)
> File "/usr/lib/python2.7/copy.py", line 329, in _reconstruct
> y = callable(*args)
> File "/usr/lib/python2.7/copy_reg.py", line 93, in __newobj__
> return cls.__new__(cls, *args)
> TypeError: object.__new__(thread.lock) is not safe, use thread.lock.__new__()
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)