Hi, Fokko Thanks for your reply. I use version 1.9.0
-----Original Message----- From: [email protected] <[email protected]> On Behalf Of Driesprong, Fokko Sent: Saturday, April 28, 2018 10:54 PM To: [email protected] Subject: Re: Problem with SparkSubmit Hi Anton, Which version of Airflow are you running? Cheers, Fokko 2018-04-27 10:24 GMT+02:00 Anton Mushin <[email protected]>: > Hi all, > I have problem with spark operator. I get exception > > user@host:/# airflow test myDAG myTask 2018-04-26 > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar > tables from /usr/lib/python3.5/lib2to3/Grammar.txt > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor > SequentialExecutor > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the DagBag > from /usr/local/airflow/dags > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to: > sparkhost > Traceback (most recent call last): > File "/usr/local/bin/airflow", line 27, in <module> > args.func(args) > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", > line 528, in test > ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) > File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", > line 50, in wrapper > result = func(*args, **kwargs) > File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", > line 1584, in run > session=session) > File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", > line 50, in wrapper > result = func(*args, **kwargs) > File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", > line 1493, in _run_raw_task > result = task_copy.execute(context=context) > File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/ > operators/spark_submit_operator.py", line 145, in execute > self._hook.submit(self._application) > File > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_su > bmit_hook.py", > line 231, in submit > **kwargs) > File "/usr/lib/python3.5/subprocess.py", line 947, in __init__ > restore_signals, start_new_session) > File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child > restore_signals, start_new_session, preexec_fn) > TypeError: Can't convert 'list' object to str implicitly > > My DAG look like: > > from airflow import DAG > from datetime import datetime, timedelta, date from > airflow.contrib.operators.spark_submit_operator import > SparkSubmitOperator > > default_args = { > 'owner': 'spark', > 'depends_on_past': False, > 'start_date': datetime.now(), > 'retries': 1, > 'retry_delay': timedelta(minutes=1) } > > dag = DAG('myDAG', default_args=default_args,) > > connection_id = "SPARK" > os.environ[('AIRFLOW_CONN_%s' % connection_id)] = 'spark://sparkhost:7077' > > _config = { > 'jars': 'spark_job.jar', > 'executor_memory': '2g', > 'name': 'myJob', > 'conn_id': connection_id, > 'java_class':'org.Job' > } > > operator = SparkSubmitOperator( > task_id='myTask', > dag=dag, > **_config > ) > > What is wrong? Could somebody help me wit it? > >
