I want to compute cume_dist on a bunch of columns in a spark dataframe, but want to remove NULL values before doing so.
I have this loop in pyspark. While this works, I see the driver runs at 100% while the executors are idle for the most part. I am reading that running a loop is an anti-pattern and should be avoided. Any pointers on how to optimize this section of pyspark code? I am running this on the AWS Glue 3.0 environment. for column_name, new_col in [ ("event_duration", "percentile_rank_evt_duration"), ("event_duration_pred", "percentile_pred_evt_duration"), ("alarm_cnt", "percentile_rank_alarm_cnt"), ("alarm_cnt_pred", "percentile_pred_alarm_cnt"), ("event_duration_adj", "percentile_rank_evt_duration_adj"), ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"), ("encounter_time", "percentile_rank_encounter_time"), ("encounter_time_pred", "percentile_pred_encounter_time"), ("encounter_time_adj", "percentile_rank_encounter_time_adj"), ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"), ]: win = ( Window().partitionBy(["p_customer_name", "p_site_name", "year_month"]) .orderBy(col(column_name)) ) df1 = df.filter(F.col(column_name).isNull()) df2 = df.filter(F.col(column_name).isNotNull()).withColumn( new_col, F.round(F.cume_dist().over(win) * lit(100)).cast("integer") ) df = df2.unionByName(df1, allowMissingColumns=True) For some reason this code seems to work faster, but it doesn't remove NULLs prior to computing the cume_dist. Not sure if this is also a proper way to do this :( for column_name, new_col in [ ("event_duration", "percentile_rank_evt_duration"), ("event_duration_pred", "percentile_pred_evt_duration"), ("alarm_cnt", "percentile_rank_alarm_cnt"), ("alarm_cnt_pred", "percentile_pred_alarm_cnt"), ("event_duration_adj", "percentile_rank_evt_duration_adj"), ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"), ("encounter_time", "percentile_rank_encounter_time"), ("encounter_time_pred", "percentile_pred_encounter_time"), ("encounter_time_adj", "percentile_rank_encounter_time_adj"), ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"), ]: win = ( Window().partitionBy(["p_customer_name", "p_site_name", "year_month"]) .orderBy(col(column_name)) ) df = df.withColumn( new_col, F.when(F.col(column_name).isNull(), F.lit(None)).otherwise( F.round(F.percent_rank().over(win) * lit(100)).cast("integer") ), ) Appreciate if anyone has any pointers on how to go about this.. thanks Ramesh