Hello,

Not sure if this is the correct place to ask, but I couldn't find anywhere
better to ask.  I'm trying to create a custom Spark Operator, that, at the
moment, will basically accomplish the same as a BashOperator, but with some
additional features. Eventually it will not be a duplicate, but I cannot
get it working as is. Should this be done as a plugin, rather than a custom
operator that inherits from BaseOperator?

I've attached the custom spark operator, and the dag file for review, as
they are too large for this. The exception I receive when attempting to run
the dag is the following:

[2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbb9fbb6e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=False, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='/opt/spotx-hadoop-airflow/dags',
task_id='run')
Sending to executor.
[2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbd7b2f7e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=True, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='DAGS_FOLDER/spark_test.py',
task_id='run')
Traceback (most recent call last):
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow",
line 16, in <module>
    args.func(args)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 206, in run
    dag = get_dag(args)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 73, in get_dag
    dagbag = DagBag(process_subdir(args.subdir))
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 166, in __init__
    self.collect_dags(dag_folder)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 385, in collect_dags
    self.process_file(dag_folder, only_if_updated=only_if_updated)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 292, in process_file
    self.bag_dag(dag, parent_dag=dag, root_dag=dag)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 341, in bag_dag
    dag.resolve_template_files()
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2715, in resolve_template_files
    t.resolve_template_files()
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2033, in resolve_template_files
    content = getattr(self, attr)
AttributeError: 'SparkOperator' object has no attribute 's'

Many thanks,
Ben
import sys
from airflow import DAG
from airflow.operators import BashOperator
from airflow.operators.SparkOperator import SparkOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'brstorrie',
    'depends_on_past': False,
    'start_date': datetime(2016,7,30),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=60),
}

spark_args = {
    "master": "yarn",
    "deploy_mode": "cluster",
    "num_executors":10,
    "executor_memory":"4G",
    "jar_args":"--dbuser cpc",

}


dag = DAG(
    'spark_operator', default_args=default_args, schedule_interval="25 * * * *")

task1 = SparkOperator(
    task_id='run',
    spark_jar='/code/spark-reporting/spotx-spark-reports-SNAPSHOT.jar',
    op_kwargs = spark_args,
    dag=dag
)
import os
import sys
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile
from airflow.utils.file import TemporaryDirectory
import logging

# Importing base classes that we need to derive
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class SparkOperator(BaseOperator):
    """
    Execute a spark submit job

    Required Params:
    
    :param spark_jar: The path to the jar to run

    :param op_kwargs: A dict of args for spark

    Optional Params:
    
    :param spark_dir (Optional): the directory to find the spark-submit
        executable, defaults to /usr/bin/

    :param master (Optional): master url to use, local or yarn
        defaults to yarn

    todo:: Implement KDC when KDC is deployed
    """
    template_fields = 'spark_jar'
    template_ext = ('.jar', '.py')
    # TODO: implement unique color
    ui_color = '#f0ede4'
    
    @apply_defaults
    def __init__(
            self,
            spark_jar,
            op_kwargs=None,
            spark_dir="/usr/bin/",
            master="yarn",
            *args, **kwargs):
        super(SparkOperator, self).__init__(*args, **kwargs)
        self.spark_jar = spark_jar
        self.op_kwargs = op_kwargs
        self.spark_dir = spark_dir
        self.master = master
        
    def execute(self, context):
        """
        Execute the spark command
        """

        print "Hit execute"
        self.build_spark_command()
        self.run_bash_cmd()


    def build_spark_command(self):
        """
        Build spark command and return it compiled
        :param context:
        :return:
        """
        executable = os.path.join(
            self.spark_dir,
            "spark-submit"
        )
        processed_args = process_kwargs()

        self.command_to_execute = executable + " " + processed_args

        print "Running {cmd}".format(cmd=command_to_execute)

    def process_kwargs(self):
        """
        Build arguments from op_kwargs

        :return: processed_args
        :return_type: string
        """
        processed_args = ""
        jar_args = [v for k, v in self.op_kwargs.items() if k == "jar_args"]
        all_args = {k.replace('_', '-'): v for k, v in self.op_kwargs.items() if k != "jar_args"}

        # Build arg string
        for k, v in all_args:
            processed_args += "--" + k + " " + v + " "

        if len(jar_args) == 1:
            processed_args += " " + jar_args[0]

        return processed_args

    def run_bash_cmd(self):
        bash_command = self.bash_command
        logging.info("tmp dir root location: \n" + gettempdir())
        with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
            with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:

                f.write(bytes(bash_command, 'utf_8'))
                f.flush()
                fname = f.name
                script_location = tmp_dir + "/" + fname
                logging.info("Temporary script "
                             "location :{0}".format(script_location))
                logging.info("Running command: " + bash_command)
                sp = Popen(
                    ['bash', fname],
                    stdout=PIPE, stderr=STDOUT,
                    cwd=tmp_dir, env=self.env)

                self.sp = sp

                logging.info("Output:")
                line = ''
                for line in iter(sp.stdout.readline, b''):
                    line = line.decode(self.output_encoding).strip()
                    logging.info(line)
                sp.wait()
                logging.info("Command exited with "
                             "return code {0}".format(sp.returncode))

                if sp.returncode:
                    raise AirflowException("Bash command failed")

        if self.xcom_push_flag:
            return line

Reply via email to