niraja b created AIRFLOW-2327:
---------------------------------
Summary: Cannot pickle PythonOperator dags using Mesos Executor
Key: AIRFLOW-2327
URL: https://issues.apache.org/jira/browse/AIRFLOW-2327
Project: Apache Airflow
Issue Type: Bug
Components: contrib
Affects Versions: Airflow 1.9.0
Environment: prod
Reporter: niraja b
We are using the MesosExecutor of Airflow
BashOperator and SimpleHTTPOperator works for us
The Scheduler is started using -p to pickle the DAGS.
The issue we have is with the following sample Code , we tried adding use_dill
without use_dill with PythonOperator and with PythonVirtualenvOperator.. we
couldnt get it sucessfully working on the agent
from __future__ import print_function
from airflow.models import DAG
from datetime import timedelta, datetime
from airflow.operators.python_operator import
PythonOperator,PythonVirtualenvOperator
DAG_ID = "testdag"
DEFAULT_ARGS = {
"start_date": datetime(2018, 4, 16, 1, 50, 16),
"schedule_interval": None,
"dagrun_timeout": timedelta(minutes=60),
"email": ['[email protected]'],
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(seconds=5),
}
def _testlambda(**kwargs):
print("hello world")
with DAG(dag_id=DAG_ID, default_args=DEFAULT_ARGS) as dag:
(
PythonVirtualenvOperator(
task_id='python_1',
python_callable=_testlambda,
use_dill=True,
requirements=['dill']
)
)
Error
Traceback (most recent call last):
File "/usr/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 358, in run
DagPickle).filter(DagPickle.id == args.pickle).first()
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2789,
in first
ret = list(self[0:1])
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2581,
in __getitem__
return list(res)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line
137, in instances
util.raise_from_cause(err)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line
203, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line
102, in instances
logging.debug(str(fetch[0]))
File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/result.py", line
156, in __repr__
return repr(sql_util._repr_row(self))
File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/util.py", line 329,
in __repr__
", ".join(trunc(value) for value in self.row),
File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line
1588, in process
return loads(value)
File "/usr/lib/python2.7/site-packages/dill/dill.py", line 299, in loads
return load(file)
File "/usr/lib/python2.7/site-packages/dill/dill.py", line 288, in load
obj = pik.load()
File "/usr/lib64/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib64/python2.7/pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "/usr/lib/python2.7/site-packages/dill/dill.py", line 445, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib64/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named
unusual_prefix_ac646764c974ff68b827793414d8eabcdca720cf_dmitrydag
I0416 11:22:34.367975 47476 executor.cpp:938] Command exited with status 1
(pid: 47482)
I0416 11:22:35.371712 47481 process.cpp:887] Failed to accept socket: future
discarded
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)