Thanks Fokko, I think it will do it but my concern that in this case my dag will initiate two separate spark sessions and it takes about 20 seconds in our yarn environment to create it. I need to run 600 dags like that every morning.
I am thinking now to create a pyspark job that will do insert and count and write it to a temp file. Still not ideal... I wish I could just parse spark SQL instead.. On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <[email protected]> wrote: > Hi Boris, > > That sounds like a nice DAG. > > This is how I would do it: First run the long running query in a spark-sql > operator like you have now. Create a python function that builds a > SparkSession within Python (using the Spark pyspark api) and fetches the > count from the spark partition that you've just created. Create a > BranchPythonOperator that will invoke this function, and based on, if the > count is ok or not, branch: > > - If the count is okay, branch downstream and continue with the normal > execution. > - If the count is off, terminate and send you and email/slack that the > count is not as expected. > > This will look something like this: > [image: Inline afbeelding 1] > > Would this solve your problem? > > Cheers, Fokko > > > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <[email protected]>: > >> Hi Fokko, thanks for your response, really appreciate it! >> >> Basically in my case I have two Spark SQL queries: >> >> 1) the first query does INSERT OVERWRITE to a partition and may take a >> while for a while >> 2) then I run a second query right after it to get count of rows of that >> partition. >> 3) I need to pass that count back to airflow dag and this count will be >> used by the next task in the DAG to make a decision if this partition >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) with a >> production table partition. >> >> So I need somehow to get that count of rows. My initial though was to >> parse >> the log and extract that count but looks like even if i do regex it does >> not quite work - spark sql writes query output to stdout which airflow >> spark sql hook does not capture right now. >> >> if you can suggest a better solution for me it would be great! >> >> Also initially I wanted to count rows and then do ALTER TABLE EXCHANGE >> PARTITION in the same pyspark job but I found out that spark does not >> support this statement yet and I have to use Hive. >> >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko <[email protected]> >> wrote: >> >> > Hi Boris, >> > >> > Thank you for your question and excuse me for the late response, >> currently >> > I'm on holiday. >> > >> > The solution that you suggest, would not be my preferred choice. >> Extracting >> > results from a log using a regex is expensive in terms of computational >> > costs, and error prone. My question is, what are you trying to >> accomplish? >> > For me there are two ways of using the Spark-sql operator: >> > >> > 1. ETL Using Spark: Instead of returning the results, write the >> results >> > back to a new table, or a new partition within the table. This data >> can >> > be >> > used downstream in the dag. Also, this will write the data to hdfs >> > which is >> > nice for persistance. >> > 2. Write the data in a simple and widely supported format (such as >> csv) >> > onto hdfs. Now you can get the data from hdfs using `hdfs dfs -get` >> to >> > you >> > local file-system. Or use `hdfs dfs -cat ... | application.py` to >> pipe >> > it >> > to your application directly. >> > >> > What you are trying to accomplish, looks for me something that would fit >> > the spark-submit job, where you can submit pyspark applications where >> you >> > can directly fetch the results from Spark: >> > >> > Welcome to >> > ____ __ >> > / __/__ ___ _____/ /__ >> > _\ \/ _ \/ _ `/ __/ '_/ >> > /__ / .__/\_,_/_/ /_/\_\ version 2.2.0 >> > /_/ >> > >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33) >> > SparkSession available as 'spark'. >> > >>> spark.sql("SELECT 1 as count").first() >> > Row(count=1) >> > >> > Most of the time we use the Spark-sql to transform the data, then use >> sqoop >> > to get the data from hdfs to a rdbms to expose the data to the business. >> > These examples are for Spark using hdfs, but for s3 it is somewhat the >> > same. >> > >> > Does this answer your question, if not, could you elaborate the problem >> > that you are facing? >> > >> > Ciao, Fokko >> > >> > >> > >> > >> > 2017-10-13 15:54 GMT+02:00 Boris <[email protected]>: >> > >> > > hi guys, >> > > >> > > I opened JIRA on this and will be working on PR >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713 >> > > >> > > any objections/suggestions conceptually? >> > > >> > > Fokko, I see you have been actively contributing to spark hooks and >> > > operators so I could use your opinion before I implement this. >> > > >> > > Boris >> > > >> > >> > >
