[
https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gaetan updated SPARK-33519:
---------------------------
Issue Type: New Feature (was: Wish)
> Batch UDF in scala
> ------------------
>
> Key: SPARK-33519
> URL: https://issues.apache.org/jira/browse/SPARK-33519
> Project: Spark
> Issue Type: New Feature
> Components: Optimizer, Spark Core
> Affects Versions: 3.0.0, 3.0.1
> Reporter: Gaetan
> Priority: Major
>
> Hello,
> Contrary to Python, there is only one type of Scala UDF, that let us define a
> Scala function to apply on a set of Column and which is called +for each
> row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able
> to see what are the inputs which are then used for column pruning, predicate
> pushdown and other optimization rules. But in some use cases, there can be a
> setup phase that we only want to execute once per worker right before
> processing inputs. For such use cases, Scala UDF is not well suited and
> mapPartitions is used instead like this:
>
> {code:java}
> ds.mapPartitions(
> it => {
> setup()
> process(it)
> }
> ){code}
> After having looked at the code, I figured that Python UDF are implemented
> via query plans that retrieve a RDD via their children and that call
> mapPartitions of that RDD to work with batches of inputs. These query plans
> are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs).
>
> Like for Python UDFs, we could implement Scala batch UDFs with query plans to
> work with a batch of inputs instead of one input. What do you think ?
> Here is a very small description of one of our use cases of Spark that could
> greatly benefit from Scala batch UDFs:
> We are using Spark to distribute some computation run in C#. To do so, we
> call the method mapPartitions of the DataFrame that represents our data.
> Inside mapPartitions, we:
> * First connect to the C# process
> * Then iterate over the inputs by sending each input to the C# process and
> by getting back the results.
> The use of mapPartitions was motivated by the setup (connection to the C#
> process) that happens for each partition.
> Now that we have a first working version, we would like to improve it by
> limiting the columns to read. We don't want to select columns that are
> required by our computation right before the mapPartitions because it would
> result in filtering out columns that could be required by other
> transformations in the workflow. Instead, we would like to take advantage of
> Catalyst for column pruning, predict pushdowns and other optimization rules.
> Using a Scala UDF to replace the mapPartitions would not be efficient because
> we would connect to the C# process for each row. An alternative would be a
> Scala "batch" UDF which would be applied on the columns that are needed for
> our computation, to take advantage of Catalyst and its optimizing rules, and
> which input would be an iterator like mapPartitions.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]