Hi Boris,

That sounds like a nice DAG.

This is how I would do it: First run the long running query in a spark-sql
operator like you have now. Create a python function that builds a
SparkSession within Python (using the Spark pyspark api) and fetches the
count from the spark partition that you've just created. Create a
BranchPythonOperator that will invoke this function, and based on, if the
count is ok or not, branch:

   - If the count is okay, branch downstream and continue with the normal
   execution.
   - If the count is off, terminate and send you and email/slack that the
   count is not as expected.

​This will look something like this:
[image: Inline afbeelding 1]​

Would this solve your problem?

Cheers, Fokko



2017-10-14 13:42 GMT+02:00 Boris Tyukin <[email protected]>:

> Hi Fokko, thanks for your response, really appreciate it!
>
> Basically in my case I have two Spark SQL queries:
>
> 1) the first query does INSERT OVERWRITE to a partition and may take a
> while for a while
> 2) then I run a second query right after it to get count of rows of that
> partition.
> 3) I need to pass that count back to airflow dag and this count will be
> used by the next task in the DAG to make a decision if this partition
> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a
> production table partition.
>
> So I need somehow to get that count of rows. My initial though was to parse
> the log and extract that count but looks like even if i do regex it does
> not quite work - spark sql writes query output to stdout which airflow
> spark sql hook does not capture right now.
>
> if you can suggest a better solution for me it would be great!
>
> Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE
> PARTITION in the same pyspark job but I found out that spark does not
> support this statement yet and I have to use Hive.
>
> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <[email protected]>
> wrote:
>
> > Hi Boris,
> >
> > Thank you for your question and excuse me for the late response,
> currently
> > I'm on holiday.
> >
> > The solution that you suggest, would not be my preferred choice.
> Extracting
> > results from a log using a regex is expensive in terms of computational
> > costs, and error prone. My question is, what are you trying to
> accomplish?
> > For me there are two ways of using the Spark-sql operator:
> >
> >    1. ETL Using Spark: Instead of returning the results, write the
> results
> >    back to a new table, or a new partition within the table. This data
> can
> > be
> >    used downstream in the dag. Also, this will write the data to hdfs
> > which is
> >    nice for persistance.
> >    2. Write the data in a simple and widely supported format (such as
> csv)
> >    onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` to
> > you
> >    local file-system. Or use `hdfs dfs -cat ... | application.py` to pipe
> > it
> >    to your application directly.
> >
> > What you are trying to accomplish, looks for me something that would fit
> > the spark-submit job, where you can submit pyspark applications where you
> > can directly fetch the results from Spark:
> >
> > Welcome to
> >       ____              __
> >      / __/__  ___ _____/ /__
> >     _\ \/ _ \/ _ `/ __/  '_/
> >    /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
> >       /_/
> >
> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33)
> > SparkSession available as 'spark'.
> > >>> spark.sql("SELECT 1 as count").first()
> > Row(count=1)
> >
> > Most of the time we use the Spark-sql to transform the data, then use
> sqoop
> > to get the data from hdfs to a rdbms to expose the data to the business.
> > These examples are for Spark using hdfs, but for s3 it is somewhat the
> > same.
> >
> > Does this answer your question, if not, could you elaborate the problem
> > that you are facing?
> >
> > Ciao, Fokko
> >
> >
> >
> >
> > 2017-10-13 15:54 GMT+02:00 Boris <[email protected]>:
> >
> > > hi guys,
> > >
> > > I opened JIRA on this and will be working on PR
> > > https://issues.apache.org/jira/browse/AIRFLOW-1713
> > >
> > > any objections/suggestions conceptually?
> > >
> > > Fokko, I see you have been actively contributing to spark hooks and
> > > operators so I could use your opinion before I implement this.
> > >
> > > Boris
> > >
> >
>

Reply via email to