No excuse necessary, we are glad you found the issue! Best, Arthur
On Tue, May 8, 2018 at 8:06 AM Anton Mushin <[email protected]> wrote: > I found mistake in my connection configuration. > The problem is not relevant. Excuse me for troubling. > > Best Regards, > Anton > > -----Original Message----- > From: Anton Mushin <[email protected]> > Sent: Tuesday, May 08, 2018 6:47 PM > To: [email protected] > Subject: RE: Problem with SparkSubmit > > Hi, Fokko > Thanks for you help. > > I got new error: > ERROR - [Errno 2] No such file or directory: 'spark-submit' > Traceback (most recent call last): > File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line > 1493, in _run_raw_task > result = task_copy.execute(context=context) > File > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py", > line 145, in execute > self._hook.submit(self._application) > File > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", > line 231, in submit > **kwargs) > File "/usr/lib/python3.5/subprocess.py", line 947, in __init__ > restore_signals, start_new_session) > File "/usr/lib/python3.5/subprocess.py", line 1551, in _execute_child > raise child_exception_type(errno_num, err_msg) > > In my case spark-submit isn't added to PATH and I can't do it. I can't > find information how I should configure spark submit operator for this case. > Could you help me? Should I set some os.environ for define path to > spark-submit script? > > Best Regards, > Anton > > > -----Original Message----- > From: [email protected] <[email protected]> On Behalf Of > Driesprong, Fokko > Sent: Monday, May 07, 2018 9:21 PM > To: [email protected] > Subject: Re: Problem with SparkSubmit > > Hi Anton, > > I see the issue now. You're passing the to the jars argument. But this is > actually for additional jars that need to be passed on to the spark > classpath. For example additional jars that provide UDF's. You need to pass > the jar to the application argument. > > _config = { > 'application': 'spark_job.jar', > 'executor_memory': '2g', > 'name': 'myJob', > 'conn_id': connection_id, > 'java_class':'org.Job' > } > > operator = SparkSubmitOperator( > task_id='myTask', > dag=dag, > **_config > ) > > Hope this helps. > > Cheers, Fokko > > 2018-05-07 6:54 GMT+02:00 Anton Mushin <[email protected]>: > > > Hi, Fokko > > Thanks for your reply. > > > > I use version 1.9.0 > > > > -----Original Message----- > > From: [email protected] <[email protected]> On Behalf Of > > Driesprong, Fokko > > Sent: Saturday, April 28, 2018 10:54 PM > > To: [email protected] > > Subject: Re: Problem with SparkSubmit > > > > Hi Anton, > > > > Which version of Airflow are you running? > > > > Cheers, Fokko > > > > 2018-04-27 10:24 GMT+02:00 Anton Mushin <[email protected]>: > > > > > Hi all, > > > I have problem with spark operator. I get exception > > > > > > user@host:/# airflow test myDAG myTask 2018-04-26 > > > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar > > > tables from /usr/lib/python3.5/lib2to3/Grammar.txt > > > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar > > > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt > > > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor > > > SequentialExecutor > > > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the > > > DagBag from /usr/local/airflow/dags > > > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to: > > > sparkhost > > > Traceback (most recent call last): > > > File "/usr/local/bin/airflow", line 27, in <module> > > > args.func(args) > > > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", > > > line 528, in test > > > ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) > > > File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", > > > line 50, in wrapper > > > result = func(*args, **kwargs) > > > File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", > > > line 1584, in run > > > session=session) > > > File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", > > > line 50, in wrapper > > > result = func(*args, **kwargs) > > > File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", > > > line 1493, in _run_raw_task > > > result = task_copy.execute(context=context) > > > File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/ > > > operators/spark_submit_operator.py", line 145, in execute > > > self._hook.submit(self._application) > > > File > > > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_ > > > su > > > bmit_hook.py", > > > line 231, in submit > > > **kwargs) > > > File "/usr/lib/python3.5/subprocess.py", line 947, in __init__ > > > restore_signals, start_new_session) > > > File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child > > > restore_signals, start_new_session, preexec_fn) > > > TypeError: Can't convert 'list' object to str implicitly > > > > > > My DAG look like: > > > > > > from airflow import DAG > > > from datetime import datetime, timedelta, date from > > > airflow.contrib.operators.spark_submit_operator import > > > SparkSubmitOperator > > > > > > default_args = { > > > 'owner': 'spark', > > > 'depends_on_past': False, > > > 'start_date': datetime.now(), > > > 'retries': 1, > > > 'retry_delay': timedelta(minutes=1) } > > > > > > dag = DAG('myDAG', default_args=default_args,) > > > > > > connection_id = "SPARK" > > > os.environ[('AIRFLOW_CONN_%s' % connection_id)] = > > 'spark://sparkhost:7077' > > > > > > _config = { > > > 'jars': 'spark_job.jar', > > > 'executor_memory': '2g', > > > 'name': 'myJob', > > > 'conn_id': connection_id, > > > 'java_class':'org.Job' > > > } > > > > > > operator = SparkSubmitOperator( > > > task_id='myTask', > > > dag=dag, > > > **_config > > > ) > > > > > > What is wrong? Could somebody help me wit it? > > > > > > > > >
