I want to compute cume_dist on a bunch of columns in a spark dataframe, but
want to remove NULL values before doing so.

I have this loop in pyspark. While this works, I see the driver runs at
100% while the executors are idle for the most part. I am reading that
running a loop is an anti-pattern and should be avoided. Any pointers on
how to optimize this section of pyspark code?

I am running this on  the AWS Glue 3.0 environment.

for column_name, new_col in [
        ("event_duration", "percentile_rank_evt_duration"),
        ("event_duration_pred", "percentile_pred_evt_duration"),
        ("alarm_cnt", "percentile_rank_alarm_cnt"),
        ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
        ("event_duration_adj", "percentile_rank_evt_duration_adj"),
        ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
        ("encounter_time", "percentile_rank_encounter_time"),
        ("encounter_time_pred", "percentile_pred_encounter_time"),
        ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
        ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
    ]:
        win = (
            Window().partitionBy(["p_customer_name", "p_site_name",
"year_month"])
             .orderBy(col(column_name))
        )
        df1 = df.filter(F.col(column_name).isNull())
        df2 = df.filter(F.col(column_name).isNotNull()).withColumn(
            new_col, F.round(F.cume_dist().over(win) *
lit(100)).cast("integer")
        )
        df = df2.unionByName(df1, allowMissingColumns=True)

For some reason this code seems to work faster, but it doesn't remove NULLs
prior to computing the cume_dist. Not sure if this is also a proper way to
do this :(

for column_name, new_col in [
        ("event_duration", "percentile_rank_evt_duration"),
        ("event_duration_pred", "percentile_pred_evt_duration"),
        ("alarm_cnt", "percentile_rank_alarm_cnt"),
        ("alarm_cnt_pred", "percentile_pred_alarm_cnt"),
        ("event_duration_adj", "percentile_rank_evt_duration_adj"),
        ("event_duration_adj_pred", "percentile_pred_evt_duration_adj"),
        ("encounter_time", "percentile_rank_encounter_time"),
        ("encounter_time_pred", "percentile_pred_encounter_time"),
        ("encounter_time_adj", "percentile_rank_encounter_time_adj"),
        ("encounter_time_adj_pred", "percentile_pred_encounter_time_adj"),
    ]:
        win = (
            Window().partitionBy(["p_customer_name", "p_site_name",
"year_month"])
            .orderBy(col(column_name))
        )
        df = df.withColumn(
            new_col,
            F.when(F.col(column_name).isNull(), F.lit(None)).otherwise(
                F.round(F.percent_rank().over(win) *
lit(100)).cast("integer")
            ),
        )

Appreciate if anyone has any pointers on how to go about this..

thanks
Ramesh

Reply via email to