[ 
https://issues.apache.org/jira/browse/SPARK-32334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163576#comment-17163576
 ] 

Robert Joseph Evans commented on SPARK-32334:
---------------------------------------------

Row to columnar and columnar to row is mostly figured out.  There are some 
performance improvements that we could probably make in the row to columnar 
transition.  The issue is going to be with a columnar to columnar transition.  
Copying data from one columnar format in a performance way is a solvable 
problem, but we might need to special case a few things or do code generation 
if we cannot come up with a good common API.  The issue is going to be with the 
desired batch size.

parquet and orc output a batch size of 4096 rows by default, but each are 
separate configs.
in memory columnar storage wants 10000 rows by default, but also has a hard 
coded soft limit of 4MB compressed.
The arrow config though is for a maximum size of 10000 rows by default.

So I am thinking that we want `SparkPlan` to optionally specify a maximum batch 
size instead of a target size.  The row to columnar transition would just build 
up a batch until it hits the target size or the end of the input iterator.  The 
columnar to columnar transition is a little more complicated. It would have to 
copy out a range of rows from one batch into another batch.  This could mean in 
the worst case that we have one batch come in, in arrow format, but we need to 
copy it  to another batch, so that we can split it up into the target size. 

This should cover the use case for basic map like UDFs.

For UDFs like `FlatMapCoGroupsInPandasExec` there is no fixed batch size, and 
in fact it takes two iterators as input that are co-grouped together.  If we 
wanted an operator like this to do columnar processing we would have to be able 
to replicate all of that processing, but for columnar Arrow formatted data. 
This is starting to go beyond what I see as the scope of this JIRA and I would 
prefer to stick with just `MapInPandasExec`, `MapPartitionsInRWithArrowExec`, 
and `ArrowEvalPythonExec` for now.  In follow on work we can start to look at 
what it would take to support an ArrowBatchedGroupedIterator, and an 
ArrowBatchedCoGroupedIterator.

> Investigate commonizing Columnar and Row data transformations 
> --------------------------------------------------------------
>
>                 Key: SPARK-32334
>                 URL: https://issues.apache.org/jira/browse/SPARK-32334
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Thomas Graves
>            Priority: Major
>
> We introduced more Columnar Support with SPARK-27396.
> With that we recognized that there is code that is doing very similar 
> transformations from ColumnarBatch or Arrow into InternalRow and vice versa.  
> For instance: 
> [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L56-L58]
> [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L389]
> We should investigate if we can commonize that code.
> We are also looking at making the internal caching serialization pluggable to 
> allow for different cache implementations. 
> ([https://github.com/apache/spark/pull/29067]). 
> It was recently brought up that we should investigate if using the data 
> source v2 api makes sense and is feasible for some of these transformations 
> to allow it to be easily extended.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to