Hi Boris, That sounds like a nice DAG.
This is how I would do it: First run the long running query in a spark-sql operator like you have now. Create a python function that builds a SparkSession within Python (using the Spark pyspark api) and fetches the count from the spark partition that you've just created. Create a BranchPythonOperator that will invoke this function, and based on, if the count is ok or not, branch: - If the count is okay, branch downstream and continue with the normal execution. - If the count is off, terminate and send you and email/slack that the count is not as expected. This will look something like this: [image: Inline afbeelding 1] Would this solve your problem? Cheers, Fokko 2017-10-14 13:42 GMT+02:00 Boris Tyukin <[email protected]>: > Hi Fokko, thanks for your response, really appreciate it! > > Basically in my case I have two Spark SQL queries: > > 1) the first query does INSERT OVERWRITE to a partition and may take a > while for a while > 2) then I run a second query right after it to get count of rows of that > partition. > 3) I need to pass that count back to airflow dag and this count will be > used by the next task in the DAG to make a decision if this partition > should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a > production table partition. > > So I need somehow to get that count of rows. My initial though was to parse > the log and extract that count but looks like even if i do regex it does > not quite work - spark sql writes query output to stdout which airflow > spark sql hook does not capture right now. > > if you can suggest a better solution for me it would be great! > > Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE > PARTITION in the same pyspark job but I found out that spark does not > support this statement yet and I have to use Hive. > > On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <[email protected]> > wrote: > > > Hi Boris, > > > > Thank you for your question and excuse me for the late response, > currently > > I'm on holiday. > > > > The solution that you suggest, would not be my preferred choice. > Extracting > > results from a log using a regex is expensive in terms of computational > > costs, and error prone. My question is, what are you trying to > accomplish? > > For me there are two ways of using the Spark-sql operator: > > > > 1. ETL Using Spark: Instead of returning the results, write the > results > > back to a new table, or a new partition within the table. This data > can > > be > > used downstream in the dag. Also, this will write the data to hdfs > > which is > > nice for persistance. > > 2. Write the data in a simple and widely supported format (such as > csv) > > onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` to > > you > > local file-system. Or use `hdfs dfs -cat ... | application.py` to pipe > > it > > to your application directly. > > > > What you are trying to accomplish, looks for me something that would fit > > the spark-submit job, where you can submit pyspark applications where you > > can directly fetch the results from Spark: > > > > Welcome to > > ____ __ > > / __/__ ___ _____/ /__ > > _\ \/ _ \/ _ `/ __/ '_/ > > /__ / .__/\_,_/_/ /_/\_\ version 2.2.0 > > /_/ > > > > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33) > > SparkSession available as 'spark'. > > >>> spark.sql("SELECT 1 as count").first() > > Row(count=1) > > > > Most of the time we use the Spark-sql to transform the data, then use > sqoop > > to get the data from hdfs to a rdbms to expose the data to the business. > > These examples are for Spark using hdfs, but for s3 it is somewhat the > > same. > > > > Does this answer your question, if not, could you elaborate the problem > > that you are facing? > > > > Ciao, Fokko > > > > > > > > > > 2017-10-13 15:54 GMT+02:00 Boris <[email protected]>: > > > > > hi guys, > > > > > > I opened JIRA on this and will be working on PR > > > https://issues.apache.org/jira/browse/AIRFLOW-1713 > > > > > > any objections/suggestions conceptually? > > > > > > Fokko, I see you have been actively contributing to spark hooks and > > > operators so I could use your opinion before I implement this. > > > > > > Boris > > > > > >
