great, this is what I expected to hear but wanted to double check. thanks for all your help, Fokko
On Mon, Oct 16, 2017 at 1:08 PM, Driesprong, Fokko <fo...@driesprong.frl> wrote: > Hi Boris, > > When kicking off Spark jobs using Airflow, cluster mode is highly > recommended since the workload of the driver is on the Hadoop cluster, and > not on the Airflow machine itself. Personally I prefer the spark-submit > operator since it will pull all the connection variables directly from > Airflow, and you'll end up with a central place (Airflow connections) where > all the configuration is kept. Otherwise you'll end up with configuration > within your Airflow logic. > > Cheers, Fokko > > 2017-10-15 17:16 GMT+02:00 Boris <boris...@gmail.com>: > > > Thanks Fokko. Do you know if it is better to use pyspark directly within > > python operator or invoke submit-job instead? My understanding in both > > cases airflow uses yarn-client deployment mode, not yarn-cluster and > spark > > driver always runs on the same node with airflow worker. Not sure it is > the > > best practice... > > > > On Oct 15, 2017 05:04, "Driesprong, Fokko" <fo...@driesprong.frl> wrote: > > > > > Hi Boris, > > > > > > Instead of writing it to a file, you can also write it to xcom, this > will > > > keep everything inside of Airflow. My personal opinion on this; > spark-sql > > > is a bit limited by nature, it only support SQL. If you want to do more > > > dynamic stuff, you will eventually have to move to spark-submit anyway. > > > > > > Cheers, Fokko > > > > > > 2017-10-14 14:45 GMT+02:00 Boris <boris...@gmail.com>: > > > > > > > Thanks Fokko, I think it will do it but my concern that in this case > my > > > dag > > > > will initiate two separate spark sessions and it takes about 20 > seconds > > > in > > > > our yarn environment to create it. I need to run 600 dags like that > > every > > > > morning. > > > > > > > > I am thinking now to create a pyspark job that will do insert and > count > > > and > > > > write it to a temp file. Still not ideal... I wish I could just parse > > > spark > > > > SQL instead.. > > > > > > > > On Oct 14, 2017 8:05 AM, "Driesprong, Fokko" <fo...@driesprong.frl> > > > wrote: > > > > > > > > > Hi Boris, > > > > > > > > > > That sounds like a nice DAG. > > > > > > > > > > This is how I would do it: First run the long running query in a > > > > spark-sql > > > > > operator like you have now. Create a python function that builds a > > > > > SparkSession within Python (using the Spark pyspark api) and > fetches > > > the > > > > > count from the spark partition that you've just created. Create a > > > > > BranchPythonOperator that will invoke this function, and based on, > if > > > the > > > > > count is ok or not, branch: > > > > > > > > > > - If the count is okay, branch downstream and continue with the > > > normal > > > > > execution. > > > > > - If the count is off, terminate and send you and email/slack > that > > > the > > > > > count is not as expected. > > > > > > > > > > This will look something like this: > > > > > [image: Inline afbeelding 1] > > > > > > > > > > Would this solve your problem? > > > > > > > > > > Cheers, Fokko > > > > > > > > > > > > > > > > > > > > 2017-10-14 13:42 GMT+02:00 Boris Tyukin <bo...@boristyukin.com>: > > > > > > > > > >> Hi Fokko, thanks for your response, really appreciate it! > > > > >> > > > > >> Basically in my case I have two Spark SQL queries: > > > > >> > > > > >> 1) the first query does INSERT OVERWRITE to a partition and may > > take a > > > > >> while for a while > > > > >> 2) then I run a second query right after it to get count of rows > of > > > that > > > > >> partition. > > > > >> 3) I need to pass that count back to airflow dag and this count > will > > > be > > > > >> used by the next task in the DAG to make a decision if this > > partition > > > > >> should be safely exchanged (using ALTER TABLE EXCHANGE PARTITION) > > > with a > > > > >> production table partition. > > > > >> > > > > >> So I need somehow to get that count of rows. My initial though was > > to > > > > >> parse > > > > >> the log and extract that count but looks like even if i do regex > it > > > does > > > > >> not quite work - spark sql writes query output to stdout which > > airflow > > > > >> spark sql hook does not capture right now. > > > > >> > > > > >> if you can suggest a better solution for me it would be great! > > > > >> > > > > >> Also initially I wanted to count rows and then do ALTER TABLE > > EXCHANGE > > > > >> PARTITION in the same pyspark job but I found out that spark does > > not > > > > >> support this statement yet and I have to use Hive. > > > > >> > > > > >> On Sat, Oct 14, 2017 at 4:53 AM, Driesprong, Fokko > > > <fo...@driesprong.frl > > > > > > > > > >> wrote: > > > > >> > > > > >> > Hi Boris, > > > > >> > > > > > >> > Thank you for your question and excuse me for the late response, > > > > >> currently > > > > >> > I'm on holiday. > > > > >> > > > > > >> > The solution that you suggest, would not be my preferred choice. > > > > >> Extracting > > > > >> > results from a log using a regex is expensive in terms of > > > > computational > > > > >> > costs, and error prone. My question is, what are you trying to > > > > >> accomplish? > > > > >> > For me there are two ways of using the Spark-sql operator: > > > > >> > > > > > >> > 1. ETL Using Spark: Instead of returning the results, write > the > > > > >> results > > > > >> > back to a new table, or a new partition within the table. > This > > > data > > > > >> can > > > > >> > be > > > > >> > used downstream in the dag. Also, this will write the data to > > > hdfs > > > > >> > which is > > > > >> > nice for persistance. > > > > >> > 2. Write the data in a simple and widely supported format > (such > > > as > > > > >> csv) > > > > >> > onto hdfs. Now you can get the data from hdfs using `hdfs dfs > > > -get` > > > > >> to > > > > >> > you > > > > >> > local file-system. Or use `hdfs dfs -cat ... | > application.py` > > to > > > > >> pipe > > > > >> > it > > > > >> > to your application directly. > > > > >> > > > > > >> > What you are trying to accomplish, looks for me something that > > would > > > > fit > > > > >> > the spark-submit job, where you can submit pyspark applications > > > where > > > > >> you > > > > >> > can directly fetch the results from Spark: > > > > >> > > > > > >> > Welcome to > > > > >> > ____ __ > > > > >> > / __/__ ___ _____/ /__ > > > > >> > _\ \/ _ \/ _ `/ __/ '_/ > > > > >> > /__ / .__/\_,_/_/ /_/\_\ version 2.2.0 > > > > >> > /_/ > > > > >> > > > > > >> > Using Python version 2.7.14 (default, Oct 11 2017 10:13:33) > > > > >> > SparkSession available as 'spark'. > > > > >> > >>> spark.sql("SELECT 1 as count").first() > > > > >> > Row(count=1) > > > > >> > > > > > >> > Most of the time we use the Spark-sql to transform the data, > then > > > use > > > > >> sqoop > > > > >> > to get the data from hdfs to a rdbms to expose the data to the > > > > business. > > > > >> > These examples are for Spark using hdfs, but for s3 it is > somewhat > > > the > > > > >> > same. > > > > >> > > > > > >> > Does this answer your question, if not, could you elaborate the > > > > problem > > > > >> > that you are facing? > > > > >> > > > > > >> > Ciao, Fokko > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > 2017-10-13 15:54 GMT+02:00 Boris <boris...@gmail.com>: > > > > >> > > > > > >> > > hi guys, > > > > >> > > > > > > >> > > I opened JIRA on this and will be working on PR > > > > >> > > https://issues.apache.org/jira/browse/AIRFLOW-1713 > > > > >> > > > > > > >> > > any objections/suggestions conceptually? > > > > >> > > > > > > >> > > Fokko, I see you have been actively contributing to spark > hooks > > > and > > > > >> > > operators so I could use your opinion before I implement this. > > > > >> > > > > > > >> > > Boris > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >