[ https://issues.apache.org/jira/browse/SPARK-33519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gaetan updated SPARK-33519: --------------------------- Description: Hello, Contrary to Python, there is only one type of Scala UDF, that let us define a Scala function to apply on a set of Column and which is called +for each row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able to see what are the inputs which are then used for column pruning, predicate pushdown and other optimization rules. But in some use cases, there can be a setup phase that we only want to execute once per worker right before processing inputs. For such use cases, Scala UDF is not well suited and mapPartitions is used instead like this: {code:java} ds.mapPartitions( it => { setup() process(it) } ){code} After having looked at the code, I figured that Python UDF are implemented via query plans that retrieve a RDD via their children and that call mapPartitions of that RDD to work with batches of inputs. These query plans are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). ------------- *Implementation details*: 1. we could implement a new Expression ScalaBatchUDF and add a boolean isBatch to Expression that tells whether an Expression is batch or not. SparkPlan SelectExec, FilterExec (and probably more) would be modified to handle batch Expression: * Generated code would include code that call batch Expression with batch of inputs instead of one single input. * doExecute() method will call batch Expression with batch of inputs instead of one single input. A SparkPlan could be composed of "single" Expressions and batch expressions. It is a first idea that would need to be refined. 2. Another solution could also be to do as for Python UDFs: a batch UDF, implemented as Expression, is extracted from the query plan it belongs to and transformed into a query plan ScalaBatchUDFExec (which become child of the query plan that the batch UDF belongs to). What do you think ? ------------- Here is a very small description of *one of our use cases of Spark* that could greatly benefit from Scala batch UDFs: We are using Spark to distribute some computation run in C#. To do so, we call the method mapPartitions of the DataFrame that represents our data. Inside mapPartitions, we: * First connect to the C# process * Then iterate over the inputs by sending each input to the C# process and by getting back the results. The use of mapPartitions was motivated by the setup (connection to the C# process) that happens for each partition. Now that we have a first working version, we would like to improve it by limiting the columns to read. We don't want to select columns that are required by our computation right before the mapPartitions because it would result in filtering out columns that could be required by other transformations in the workflow. Instead, we would like to take advantage of Catalyst for column pruning, predict pushdowns and other optimization rules. Using a Scala UDF to replace the mapPartitions would not be efficient because we would connect to the C# process for each row. An alternative would be a Scala "batch" UDF which would be applied on the columns that are needed for our computation, to take advantage of Catalyst and its optimizing rules, and which input would be an iterator like mapPartitions. was: Hello, Contrary to Python, there is only one type of Scala UDF, that let us define a Scala function to apply on a set of Column and which is called +for each row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able to see what are the inputs which are then used for column pruning, predicate pushdown and other optimization rules. But in some use cases, there can be a setup phase that we only want to execute once per worker right before processing inputs. For such use cases, Scala UDF is not well suited and mapPartitions is used instead like this: {code:java} ds.mapPartitions( it => { setup() process(it) } ){code} After having looked at the code, I figured that Python UDF are implemented via query plans that retrieve a RDD via their children and that call mapPartitions of that RDD to work with batches of inputs. These query plans are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). *Implementation details*: we could implement a new Expression ScalaBatchUDF and add a boolean isBatch to Expression that tells whether an Expression is batch or not. SparkPlan SelectExec, FilterExec (and probably more) would be modified to handle batch Expression: * Generated code would include code that call batch Expression with batch of inputs instead of one single input. * doExecute() method will call batch Expression with batch of inputs instead of one single input. A SparkPlan could be composed of "single" Expressions and batch expressions. It is a first idea that would need to be refined. What do you think ? Here is a very small description of *one of our use cases of Spark* that could greatly benefit from Scala batch UDFs: We are using Spark to distribute some computation run in C#. To do so, we call the method mapPartitions of the DataFrame that represents our data. Inside mapPartitions, we: * First connect to the C# process * Then iterate over the inputs by sending each input to the C# process and by getting back the results. The use of mapPartitions was motivated by the setup (connection to the C# process) that happens for each partition. Now that we have a first working version, we would like to improve it by limiting the columns to read. We don't want to select columns that are required by our computation right before the mapPartitions because it would result in filtering out columns that could be required by other transformations in the workflow. Instead, we would like to take advantage of Catalyst for column pruning, predict pushdowns and other optimization rules. Using a Scala UDF to replace the mapPartitions would not be efficient because we would connect to the C# process for each row. An alternative would be a Scala "batch" UDF which would be applied on the columns that are needed for our computation, to take advantage of Catalyst and its optimizing rules, and which input would be an iterator like mapPartitions. > Batch UDF in scala > ------------------ > > Key: SPARK-33519 > URL: https://issues.apache.org/jira/browse/SPARK-33519 > Project: Spark > Issue Type: New Feature > Components: Optimizer, Spark Core > Affects Versions: 3.0.0, 3.0.1 > Reporter: Gaetan > Priority: Major > > Hello, > Contrary to Python, there is only one type of Scala UDF, that let us define a > Scala function to apply on a set of Column and which is called +for each > row+. One advantage of Scala UDF over mapPartitions is that Catalyst is able > to see what are the inputs which are then used for column pruning, predicate > pushdown and other optimization rules. But in some use cases, there can be a > setup phase that we only want to execute once per worker right before > processing inputs. For such use cases, Scala UDF is not well suited and > mapPartitions is used instead like this: > > {code:java} > ds.mapPartitions( > it => { > setup() > process(it) > } > ){code} > After having looked at the code, I figured that Python UDF are implemented > via query plans that retrieve a RDD via their children and that call > mapPartitions of that RDD to work with batches of inputs. These query plans > are generated by Catalyst by extracting Python UDFs (rule ExtractPythonUDFs). > ------------- > *Implementation details*: > 1. we could implement a new Expression ScalaBatchUDF and add a boolean > isBatch to Expression that tells whether an Expression is batch or not. > SparkPlan SelectExec, FilterExec (and probably more) would be modified to > handle batch Expression: > * Generated code would include code that call batch Expression with batch of > inputs instead of one single input. > * doExecute() method will call batch Expression with batch of inputs instead > of one single input. > A SparkPlan could be composed of "single" Expressions and batch expressions. > It is a first idea that would need to be refined. > 2. Another solution could also be to do as for Python UDFs: a batch UDF, > implemented as Expression, is extracted from the query plan it belongs to and > transformed into a query plan ScalaBatchUDFExec (which become child of the > query plan that the batch UDF belongs to). > What do you think ? > ------------- > Here is a very small description of *one of our use cases of Spark* that > could greatly benefit from Scala batch UDFs: > We are using Spark to distribute some computation run in C#. To do so, we > call the method mapPartitions of the DataFrame that represents our data. > Inside mapPartitions, we: > * First connect to the C# process > * Then iterate over the inputs by sending each input to the C# process and > by getting back the results. > The use of mapPartitions was motivated by the setup (connection to the C# > process) that happens for each partition. > Now that we have a first working version, we would like to improve it by > limiting the columns to read. We don't want to select columns that are > required by our computation right before the mapPartitions because it would > result in filtering out columns that could be required by other > transformations in the workflow. Instead, we would like to take advantage of > Catalyst for column pruning, predict pushdowns and other optimization rules. > Using a Scala UDF to replace the mapPartitions would not be efficient because > we would connect to the C# process for each row. An alternative would be a > Scala "batch" UDF which would be applied on the columns that are needed for > our computation, to take advantage of Catalyst and its optimizing rules, and > which input would be an iterator like mapPartitions. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org