Hello,
Not sure if this is the correct place to ask, but I couldn't find anywhere
better to ask. I'm trying to create a custom Spark Operator, that, at the
moment, will basically accomplish the same as a BashOperator, but with some
additional features. Eventually it will not be a duplicate, but I cannot
get it working as is. Should this be done as a plugin, rather than a custom
operator that inherits from BaseOperator?
I've attached the custom spark operator, and the dag file for review, as
they are too large for this. The exception I receive when attempting to run
the dag is the following:
[2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbb9fbb6e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=False, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='/opt/spotx-hadoop-airflow/dags',
task_id='run')
Sending to executor.
[2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbd7b2f7e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=True, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='DAGS_FOLDER/spark_test.py',
task_id='run')
Traceback (most recent call last):
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow",
line 16, in <module>
args.func(args)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 206, in run
dag = get_dag(args)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 73, in get_dag
dagbag = DagBag(process_subdir(args.subdir))
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 166, in __init__
self.collect_dags(dag_folder)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 385, in collect_dags
self.process_file(dag_folder, only_if_updated=only_if_updated)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 292, in process_file
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 341, in bag_dag
dag.resolve_template_files()
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2715, in resolve_template_files
t.resolve_template_files()
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2033, in resolve_template_files
content = getattr(self, attr)
AttributeError: 'SparkOperator' object has no attribute 's'
Many thanks,
Ben
import sys
from airflow import DAG
from airflow.operators import BashOperator
from airflow.operators.SparkOperator import SparkOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'brstorrie',
'depends_on_past': False,
'start_date': datetime(2016,7,30),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=60),
}
spark_args = {
"master": "yarn",
"deploy_mode": "cluster",
"num_executors":10,
"executor_memory":"4G",
"jar_args":"--dbuser cpc",
}
dag = DAG(
'spark_operator', default_args=default_args, schedule_interval="25 * * * *")
task1 = SparkOperator(
task_id='run',
spark_jar='/code/spark-reporting/spotx-spark-reports-SNAPSHOT.jar',
op_kwargs = spark_args,
dag=dag
)import os
import sys
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile
from airflow.utils.file import TemporaryDirectory
import logging
# Importing base classes that we need to derive
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class SparkOperator(BaseOperator):
"""
Execute a spark submit job
Required Params:
:param spark_jar: The path to the jar to run
:param op_kwargs: A dict of args for spark
Optional Params:
:param spark_dir (Optional): the directory to find the spark-submit
executable, defaults to /usr/bin/
:param master (Optional): master url to use, local or yarn
defaults to yarn
todo:: Implement KDC when KDC is deployed
"""
template_fields = 'spark_jar'
template_ext = ('.jar', '.py')
# TODO: implement unique color
ui_color = '#f0ede4'
@apply_defaults
def __init__(
self,
spark_jar,
op_kwargs=None,
spark_dir="/usr/bin/",
master="yarn",
*args, **kwargs):
super(SparkOperator, self).__init__(*args, **kwargs)
self.spark_jar = spark_jar
self.op_kwargs = op_kwargs
self.spark_dir = spark_dir
self.master = master
def execute(self, context):
"""
Execute the spark command
"""
print "Hit execute"
self.build_spark_command()
self.run_bash_cmd()
def build_spark_command(self):
"""
Build spark command and return it compiled
:param context:
:return:
"""
executable = os.path.join(
self.spark_dir,
"spark-submit"
)
processed_args = process_kwargs()
self.command_to_execute = executable + " " + processed_args
print "Running {cmd}".format(cmd=command_to_execute)
def process_kwargs(self):
"""
Build arguments from op_kwargs
:return: processed_args
:return_type: string
"""
processed_args = ""
jar_args = [v for k, v in self.op_kwargs.items() if k == "jar_args"]
all_args = {k.replace('_', '-'): v for k, v in self.op_kwargs.items() if k != "jar_args"}
# Build arg string
for k, v in all_args:
processed_args += "--" + k + " " + v + " "
if len(jar_args) == 1:
processed_args += " " + jar_args[0]
return processed_args
def run_bash_cmd(self):
bash_command = self.bash_command
logging.info("tmp dir root location: \n" + gettempdir())
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
f.write(bytes(bash_command, 'utf_8'))
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
logging.info("Temporary script "
"location :{0}".format(script_location))
logging.info("Running command: " + bash_command)
sp = Popen(
['bash', fname],
stdout=PIPE, stderr=STDOUT,
cwd=tmp_dir, env=self.env)
self.sp = sp
logging.info("Output:")
line = ''
for line in iter(sp.stdout.readline, b''):
line = line.decode(self.output_encoding).strip()
logging.info(line)
sp.wait()
logging.info("Command exited with "
"return code {0}".format(sp.returncode))
if sp.returncode:
raise AirflowException("Bash command failed")
if self.xcom_push_flag:
return line