`template_fields` should be a proper iterable (list or tuple). Change
`template_fields = 'spark_jar'` to `template_fields = ('spark_jar',)`Max On Mon, Jul 25, 2016 at 3:06 PM, Ben Storrie <[email protected]> wrote: > Hello, > > Not sure if this is the correct place to ask, but I couldn't find anywhere > better to ask. I'm trying to create a custom Spark Operator, that, at the > moment, will basically accomplish the same as a BashOperator, but with some > additional features. Eventually it will not be a duplicate, but I cannot > get it working as is. Should this be done as a plugin, rather than a custom > operator that inherits from BaseOperator? > > I've attached the custom spark operator, and the dag file for review, as > they are too large for this. The exception I receive when attempting to run > the dag is the following: > > [2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor > LocalExecutor > Namespace(dag_id='spark_operator_2', > execution_date=datetime.datetime(2016, 7, 25, 0, 0), force=False, > func=<function run at 0x7fbb9fbb6e60>, ignore_dependencies=False, > ignore_depends_on_past=False, job_id=None, local=False, mark_success=False, > pickle=None, pool=None, raw=False, ship_dag=False, subcommand='run', > subdir='/opt/spotx-hadoop-airflow/dags', task_id='run') > Sending to executor. > [2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor > LocalExecutor > Namespace(dag_id='spark_operator_2', > execution_date=datetime.datetime(2016, 7, 25, 0, 0), force=False, > func=<function run at 0x7fbd7b2f7e60>, ignore_dependencies=False, > ignore_depends_on_past=False, job_id=None, local=True, mark_success=False, > pickle=None, pool=None, raw=False, ship_dag=False, subcommand='run', > subdir='DAGS_FOLDER/spark_test.py', task_id='run') > Traceback (most recent call last): > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow", > line 16, in <module> > args.func(args) > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py", > line 206, in run > dag = get_dag(args) > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py", > line 73, in get_dag > dagbag = DagBag(process_subdir(args.subdir)) > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py", > line 166, in __init__ > self.collect_dags(dag_folder) > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py", > line 385, in collect_dags > self.process_file(dag_folder, only_if_updated=only_if_updated) > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py", > line 292, in process_file > self.bag_dag(dag, parent_dag=dag, root_dag=dag) > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py", > line 341, in bag_dag > dag.resolve_template_files() > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py", > line 2715, in resolve_template_files > t.resolve_template_files() > File > "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py", > line 2033, in resolve_template_files > content = getattr(self, attr) > AttributeError: 'SparkOperator' object has no attribute 's' > > Many thanks, > Ben >
