[ https://issues.apache.org/jira/browse/SPARK-32334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163576#comment-17163576 ]
Robert Joseph Evans commented on SPARK-32334: --------------------------------------------- Row to columnar and columnar to row is mostly figured out. There are some performance improvements that we could probably make in the row to columnar transition. The issue is going to be with a columnar to columnar transition. Copying data from one columnar format in a performance way is a solvable problem, but we might need to special case a few things or do code generation if we cannot come up with a good common API. The issue is going to be with the desired batch size. parquet and orc output a batch size of 4096 rows by default, but each are separate configs. in memory columnar storage wants 10000 rows by default, but also has a hard coded soft limit of 4MB compressed. The arrow config though is for a maximum size of 10000 rows by default. So I am thinking that we want `SparkPlan` to optionally specify a maximum batch size instead of a target size. The row to columnar transition would just build up a batch until it hits the target size or the end of the input iterator. The columnar to columnar transition is a little more complicated. It would have to copy out a range of rows from one batch into another batch. This could mean in the worst case that we have one batch come in, in arrow format, but we need to copy it to another batch, so that we can split it up into the target size. This should cover the use case for basic map like UDFs. For UDFs like `FlatMapCoGroupsInPandasExec` there is no fixed batch size, and in fact it takes two iterators as input that are co-grouped together. If we wanted an operator like this to do columnar processing we would have to be able to replicate all of that processing, but for columnar Arrow formatted data. This is starting to go beyond what I see as the scope of this JIRA and I would prefer to stick with just `MapInPandasExec`, `MapPartitionsInRWithArrowExec`, and `ArrowEvalPythonExec` for now. In follow on work we can start to look at what it would take to support an ArrowBatchedGroupedIterator, and an ArrowBatchedCoGroupedIterator. > Investigate commonizing Columnar and Row data transformations > -------------------------------------------------------------- > > Key: SPARK-32334 > URL: https://issues.apache.org/jira/browse/SPARK-32334 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.0.0 > Reporter: Thomas Graves > Priority: Major > > We introduced more Columnar Support with SPARK-27396. > With that we recognized that there is code that is doing very similar > transformations from ColumnarBatch or Arrow into InternalRow and vice versa. > For instance: > [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L56-L58] > [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L389] > We should investigate if we can commonize that code. > We are also looking at making the internal caching serialization pluggable to > allow for different cache implementations. > ([https://github.com/apache/spark/pull/29067]). > It was recently brought up that we should investigate if using the data > source v2 api makes sense and is feasible for some of these transformations > to allow it to be easily extended. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org