Hi Boris,

When kicking off Spark jobs using Airflow, cluster mode is highly
recommended since the workload of the driver is on the Hadoop cluster, and
not on the Airflow machine itself. Personally I prefer the spark-submit
operator since it will pull all the connection variables directly from
Airflow, and you'll end up with a central place (Airflow connections) where
all the configuration is kept. Otherwise you'll end up with configuration
within your Airflow logic.

Cheers, Fokko

2017-10-15 17:16 GMT+02:00 Boris <[email protected]>:

> Thanks Fokko. Do you know if it is better to use pyspark directly within
> python operator or invoke submit-job instead? My understanding in both
> cases airflow uses yarn-client deployment mode, not yarn-cluster and spark
> driver always runs on the same node with airflow worker. Not sure it is the
> best practice...
>
> On Oct 15, 2017 05:04, "Driesprong, Fokko" <[email protected]> wrote:
>
> > Hi Boris,
> >
> > Instead of writing it to a file, you can also write it to xcom, this will
> > keep everything inside of Airflow. My personal opinion on this; spark-sql
> > is a bit limited by nature, it only support SQL. If you want to do more
> > dynamic stuff, you will eventually have to move to spark-submit anyway.
> >
> > Cheers, Fokko
> >
> > 2017-10-14 14:45 GMT+02:00 Boris <[email protected]>:
> >
> > > Thanks Fokko, I think it will do it but my concern that in this case my
> > dag
> > > will initiate two separate spark sessions and it takes about 20 seconds
> > in
> > > our yarn environment to create it. I need to run 600 dags like that
> every
> > > morning.
> > >
> > > I am thinking now to create a pyspark job that will do insert and count
> > and
> > > write it to a temp file. Still not ideal... I wish I could just parse
> > spark
> > > SQL instead..
> > >
> > > On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <[email protected]>
> > wrote:
> > >
> > > > Hi Boris,
> > > >
> > > > That sounds like a nice DAG.
> > > >
> > > > This is how I would do it: First run the long running query in a
> > > spark-sql
> > > > operator like you have now. Create a python function that builds a
> > > > SparkSession within Python (using the Spark pyspark api) and fetches
> > the
> > > > count from the spark partition that you've just created. Create a
> > > > BranchPythonOperator that will invoke this function, and based on, if
> > the
> > > > count is ok or not, branch:
> > > >
> > > >    - If the count is okay, branch downstream and continue with the
> > normal
> > > >    execution.
> > > >    - If the count is off, terminate and send you and email/slack that
> > the
> > > >    count is not as expected.
> > > >
> > > > ​This will look something like this:
> > > > [image: Inline afbeelding 1]​
> > > >
> > > > Would this solve your problem?
> > > >
> > > > Cheers, Fokko
> > > >
> > > >
> > > >
> > > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <[email protected]>:
> > > >
> > > >> Hi Fokko, thanks for your response, really appreciate it!
> > > >>
> > > >> Basically in my case I have two Spark SQL queries:
> > > >>
> > > >> 1) the first query does INSERT OVERWRITE to a partition and may
> take a
> > > >> while for a while
> > > >> 2) then I run a second query right after it to get count of rows of
> > that
> > > >> partition.
> > > >> 3) I need to pass that count back to airflow dag and this count will
> > be
> > > >> used by the next task in the DAG to make a decision if this
> partition
> > > >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION)
> > with a
> > > >> production table partition.
> > > >>
> > > >> So I need somehow to get that count of rows. My initial though was
> to
> > > >> parse
> > > >> the log and extract that count but looks like even if i do regex it
> > does
> > > >> not quite work - spark sql writes query output to stdout which
> airflow
> > > >> spark sql hook does not capture right now.
> > > >>
> > > >> if you can suggest a better solution for me it would be great!
> > > >>
> > > >> Also initially I wanted to count rows and then do ALTER TABLE
> EXCHANGE
> > > >> PARTITION in the same pyspark job but I found out that spark does
> not
> > > >> support this statement yet and I have to use Hive.
> > > >>
> > > >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko
> > <[email protected]
> > > >
> > > >> wrote:
> > > >>
> > > >> > Hi Boris,
> > > >> >
> > > >> > Thank you for your question and excuse me for the late response,
> > > >> currently
> > > >> > I'm on holiday.
> > > >> >
> > > >> > The solution that you suggest, would not be my preferred choice.
> > > >> Extracting
> > > >> > results from a log using a regex is expensive in terms of
> > > computational
> > > >> > costs, and error prone. My question is, what are you trying to
> > > >> accomplish?
> > > >> > For me there are two ways of using the Spark-sql operator:
> > > >> >
> > > >> >    1. ETL Using Spark: Instead of returning the results, write the
> > > >> results
> > > >> >    back to a new table, or a new partition within the table. This
> > data
> > > >> can
> > > >> > be
> > > >> >    used downstream in the dag. Also, this will write the data to
> > hdfs
> > > >> > which is
> > > >> >    nice for persistance.
> > > >> >    2. Write the data in a simple and widely supported format (such
> > as
> > > >> csv)
> > > >> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs
> > -get`
> > > >> to
> > > >> > you
> > > >> >    local file-system. Or use `hdfs dfs -cat ... | application.py`
> to
> > > >> pipe
> > > >> > it
> > > >> >    to your application directly.
> > > >> >
> > > >> > What you are trying to accomplish, looks for me something that
> would
> > > fit
> > > >> > the spark-submit job, where you can submit pyspark applications
> > where
> > > >> you
> > > >> > can directly fetch the results from Spark:
> > > >> >
> > > >> > Welcome to
> > > >> >       ____              __
> > > >> >      / __/__  ___ _____/ /__
> > > >> >     _\ \/ _ \/ _ `/ __/  '_/
> > > >> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> > > >> >       /_/
> > > >> >
> > > >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> > > >> > SparkSession available as 'spark'.
> > > >> > >>> spark.sql("SELECT 1 as count").first()
> > > >> > Row(count=1)
> > > >> >
> > > >> > Most of the time we use the Spark-sql to transform the data, then
> > use
> > > >> sqoop
> > > >> > to get the data from hdfs to a rdbms to expose the data to the
> > > business.
> > > >> > These examples are for Spark using hdfs, but for s3 it is somewhat
> > the
> > > >> > same.
> > > >> >
> > > >> > Does this answer your question, if not, could you elaborate the
> > > problem
> > > >> > that you are facing?
> > > >> >
> > > >> > Ciao, Fokko
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > 2017-10-13 15:54 GMT+02:00 Boris <[email protected]>:
> > > >> >
> > > >> > > hi guys,
> > > >> > >
> > > >> > > I opened JIRA on this and will be working on PR
> > > >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> > > >> > >
> > > >> > > any objections/suggestions conceptually?
> > > >> > >
> > > >> > > Fokko, I see you have been actively contributing to spark hooks
> > and
> > > >> > > operators so I could use your opinion before I implement this.
> > > >> > >
> > > >> > > Boris
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to