unsubscribe
unsubscribe
What is the best way to organize a join within a foreach?
I have two tables: {users, orders}. In this example, let's say that for each 1 User in the users table, there are 10 Orders in the orders table. I have to use pyspark to generate a statement of Orders for each User. So, a single user will need his/her own list of Orders. Additionally, I need to send this statement to the real-world user via email (for example). My first intuition was to apply a DataFrame.foreach() on the users DataFrame. This way, I can rely on the spark workers to handle the email sending individually. However, I now do not know the best way to get each User's Orders. I will soon try the following (pseudo-code): ``` users_df = orders_df = #this is poorly named for max understandability in this context def foreach_function(row): user_id = row.user_id user_orders_df = orders_df.select(f'user_id = {user_id}') #here, I'd get any User info from 'row' #then, I'd convert all 'user_orders' to JSON #then, I'd prepare the email and send it users_df.foreach(foreach_function) ``` It is my understanding that if I do my user-specific work in the foreach function, I will capitalize on Spark's scalability when doing that work. However, I am worried of two things: If I take all Orders up front... Will that work? Will I be taking too much? Will I be taking Orders on partitions who won't handle them (different User). If I create the orders_df (filtered) within the foreach function... Will it work? Will that be too much IO to DB? The question ultimately is: How can I achieve this goal efficiently? I have not yet tried anything here. I am doing so as we speak, but am suffering from choice-paralysis. Please and thank you.
What is the best way to organize a join within a foreach?
Marco Costantini 5:55 PM (5 minutes ago) to user I have two tables: {users, orders}. In this example, let's say that for each 1 User in the users table, there are 10 Orders in the orders table. I have to use pyspark to generate a statement of Orders for each User. So, a single user will need his/her own list of Orders. Additionally, I need to send this statement to the real-world user via email (for example). My first intuition was to apply a DataFrame.foreach() on the users DataFrame. This way, I can rely on the spark workers to handle the email sending individually. However, I now do not know the best way to get each User's Orders. I will soon try the following (pseudo-code): ``` users_df = orders_df = #this is poorly named for max understandability in this context def foreach_function(row): user_id = row.user_id user_orders_df = orders_df.select(f'user_id = {user_id}') #here, I'd get any User info from 'row' #then, I'd convert all 'user_orders' to JSON #then, I'd prepare the email and send it users_df.foreach(foreach_function) ``` It is my understanding that if I do my user-specific work in the foreach function, I will capitalize on Spark's scalability when doing that work. However, I am worried of two things: If I take all Orders up front... Will that work? Will I be taking too much? Will I be taking Orders on partitions who won't handle them (different User). If I create the orders_df (filtered) within the foreach function... Will it work? Will that be too much IO to DB? The question ultimately is: How can I achieve this goal efficiently? I have not yet tried anything here. I am doing so as we speak, but am suffering from choice-paralysis. Please and thank you.
Unsubcribing
Hello, does this mailist list have an administrator, please? I'm trying to unsubscribe, but to no avail. Many thanks. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Reg: create spark using virtual machine through chef
Hi team, Myself akhil, Iam trying to create a spark using virtual machine through chef. Could you please help us how we can do it. If possible could you please share the documentation. Regards Akhil
Re: Use Spark Aggregator in PySpark
Hi, For an aggregating UDF, use spark.udf.registerJavaUDAF(name, className). Enrico Am 23.04.23 um 23:42 schrieb Thomas Wang: Hi Spark Community, I have implemented a custom Spark Aggregator (a subclass to |org.apache.spark.sql.expressions.Aggregator|). Now I'm trying to use it in a PySpark application, but for some reason, I'm not able to trigger the function. Here is what I'm doing, could someone help me take a look? Thanks. spark = self._gen_spark_session() spark.udf.registerJavaFunction( name="MyAggrator", javaClassName="my.package.MyAggrator", returnType=ArrayType(elementType=LongType()), ) The above code runs successfully. However, to call it, I assume I should do something like the following. df = df.groupBy().agg( functions.expr("MyAggrator(input)").alias("output"), ) But this one gives me the following error: pyspark.sql.utils.AnalysisException: UDF class my.package.MyAggrator doesn't implement any UDF interface My question is how can I use the Spark Aggregator defined in a jar file in PySpark? Thanks. Thomas