Vincent created SPARK-30670:
-------------------------------

             Summary: Pipes for PySpark
                 Key: SPARK-30670
                 URL: https://issues.apache.org/jira/browse/SPARK-30670
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 2.4.4
            Reporter: Vincent


I would propose to add a `pipe` method to a Spark Dataframe. It allows for a 
functional programming pattern that is inspired from the tidyverse that is 
currently missing. The pandas community also recently adopted this pattern, 
documented [here]([https://tomaugspurger.github.io/method-chaining.html).]

This is the idea. Suppose you had this;


{code:java}
# file that has [user, date, timestamp, eventtype]
ddf = spark.read.parquet("<filepath>")

w_user = Window().partitionBy("user")
w_user_date = Window().partitionBy("user", "date")
w_user_time = Window().partitionBy("user").sortBy("timestamp")

thres_sesstime = 60 * 15 
min_n_rows = 10
min_n_sessions = 5

clean_ddf = (ddf
  .withColumn("delta", sf.col("timestamp") - sf.lag("timestamp").over(w_user))
  .withColumn("new_session", (sf.col("delta") > thres_sesstime).cast("integer"))
  .withColumn("session", sf.sum(sf.col("new_session")).over(w_user))
  .drop("new_session")
  .drop("delta")
  .withColumn("nrow_user", sf.count(sf.col("timestamp")))
  .withColumn("nrow_user_date", sf.approx_count_distinct(sf.col("date")))
  .filter(sf.col("nrow_user") > min_n_rows)
  .filter(sf.col("nrow_user_date") > min_n_sessions)
  .drop("nrow_user")
  .drop("nrow_user_date"))
{code}
The code works and it is somewhat clear. We add a session to the dataframe and 
then we use this to remove outliers. The issue is that this chain of commands 
can get quite long so instead it might be better to turn this into functions.
{code:java}
def add_session(dataf, session_threshold=60*15):
    w_user = Window().partitionBy("user")
  
    return (dataf  
    .withColumn("delta", sf.col("timestamp") - sf.lag("timestamp").over(w_user))
    .withColumn("new_session", (sf.col("delta") > 
threshold_sesstime).cast("integer"))
    .withColumn("session", sf.sum(sf.col("new_session")).over(w_user))
    .drop("new_session")
    .drop("delta"))

def remove_outliers(dataf, min_n_rows=10, min_n_sessions=5):
    w_user_date = Window().partitionBy("user", "date")
    w_user_time = Window().partitionBy("user").sortBy("timestamp")    
    
    return (dataf  
    .withColumn("nrow_user", sf.count(sf.col("timestamp")))
    .withColumn("nrow_user_date", sf.approx_count_distinct(sf.col("date")))
    .filter(sf.col("nrow_user") > min_n_rows)
    .filter(sf.col("nrow_user_date") > min_n_sessions)
    .drop("nrow_user")
    .drop("nrow_user_date"))
{code}
The issue lies not in these functions. These functions are great! You can unit 
test them and they really give nice verbs that function as an abstraction. The 
issue is in how you now need to apply them. 
{code:java}
remove_outliers(add_session(ddf, session_threshold=1000), min_n_rows=11)
{code}
It'd be much nicer to perhaps allow for this;
{code:java}
(ddf
  .pipe(add_session, session_threshold=900)
  .pipe(remove_outliers, min_n_rows=11))
{code}
The cool thing about this is that you can really easily allow for method 
chaining but also that you have an amazing way to split high level code and low 
level code. You still allow mutation as a high level by exposing keyword 
arguments but you can easily find the lower level code in debugging because 
you've contained details to their functions.

For code maintenance, I've relied on this pattern a lot personally. But sofar, 
I've always monkey-patched spark to be able to do this.
{code:java}
from pyspark.sql import DataFrame 

def pipe(self, func, *args, **kwargs):
    return func(self, *args, **kwargs)
{code}
Could I perhaps add these few lines of code to the codebase?

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to