Michael Tong created SPARK-42645:
------------------------------------
Summary: Introduce feature to allow for function caching across
input rows.
Key: SPARK-42645
URL: https://issues.apache.org/jira/browse/SPARK-42645
Project: Spark
Issue Type: Wish
Components: Optimizer
Affects Versions: 3.3.2
Reporter: Michael Tong
Introduce the ability to make functions cachable across input rows. I'm
imagining this function to work similarly to python's
[functools.cache|https://docs.python.org/3/library/functools.html#functools.cache]
where you could add a decorator to certain expensive functions that you know
will regularly encounter repeated values as you read the input data.
With this new feature you would be able to significantly speed up many real
world jobs that use expensive functions on data that naturally has repeated
column values. An example of this would be parsing user agent fields from
internet traffic logs partitioned by user id. Even though the data is not
sorted by user agent, in a sample of 10k continuous rows there would be much
less than 10k unique values because popular user agents exist on a large
fraction of traffic and the user agent of the first event from a user is likely
to be shared among all subsequent events from that user. Currently there is a
way to hack an approximation of this in a python implementation of this via
pandas_udfs. This works because pandas_udfs by default read in batches of 10k
input rows, so you can used a caching UDF that empties every 10k rows. At my
current job I have noticed that some applications of this trick can
significantly speed up queries where custom UDFs are the bottleneck in a query.
An example of this is
{code:java}
@F.pandas_udf(T.StringType())
def parse_user_agent_field(user_agent_series):
@functools.cache
def parse_user_agent_field_helper(user_agent):
# parse the user agent and return the relevant field
return None
return user_agent_series.apply(parse_user_agent_field_helper){code}
It would be nice if there was some official support for this behavior for both
built in functions and UDFs. If there was official support for this I'd imagine
it to look something like
{code:java}
# using pyspark dataframe API
df = df.withColumn(output_col, F.cache(F.function)(input_col)){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]