[
https://issues.apache.org/jira/browse/SPARK-32334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163576#comment-17163576
]
Robert Joseph Evans commented on SPARK-32334:
---------------------------------------------
Row to columnar and columnar to row is mostly figured out. There are some
performance improvements that we could probably make in the row to columnar
transition. The issue is going to be with a columnar to columnar transition.
Copying data from one columnar format in a performance way is a solvable
problem, but we might need to special case a few things or do code generation
if we cannot come up with a good common API. The issue is going to be with the
desired batch size.
parquet and orc output a batch size of 4096 rows by default, but each are
separate configs.
in memory columnar storage wants 10000 rows by default, but also has a hard
coded soft limit of 4MB compressed.
The arrow config though is for a maximum size of 10000 rows by default.
So I am thinking that we want `SparkPlan` to optionally specify a maximum batch
size instead of a target size. The row to columnar transition would just build
up a batch until it hits the target size or the end of the input iterator. The
columnar to columnar transition is a little more complicated. It would have to
copy out a range of rows from one batch into another batch. This could mean in
the worst case that we have one batch come in, in arrow format, but we need to
copy it to another batch, so that we can split it up into the target size.
This should cover the use case for basic map like UDFs.
For UDFs like `FlatMapCoGroupsInPandasExec` there is no fixed batch size, and
in fact it takes two iterators as input that are co-grouped together. If we
wanted an operator like this to do columnar processing we would have to be able
to replicate all of that processing, but for columnar Arrow formatted data.
This is starting to go beyond what I see as the scope of this JIRA and I would
prefer to stick with just `MapInPandasExec`, `MapPartitionsInRWithArrowExec`,
and `ArrowEvalPythonExec` for now. In follow on work we can start to look at
what it would take to support an ArrowBatchedGroupedIterator, and an
ArrowBatchedCoGroupedIterator.
> Investigate commonizing Columnar and Row data transformations
> --------------------------------------------------------------
>
> Key: SPARK-32334
> URL: https://issues.apache.org/jira/browse/SPARK-32334
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Thomas Graves
> Priority: Major
>
> We introduced more Columnar Support with SPARK-27396.
> With that we recognized that there is code that is doing very similar
> transformations from ColumnarBatch or Arrow into InternalRow and vice versa.
> For instance:
> [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L56-L58]
> [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L389]
> We should investigate if we can commonize that code.
> We are also looking at making the internal caching serialization pluggable to
> allow for different cache implementations.
> ([https://github.com/apache/spark/pull/29067]).
> It was recently brought up that we should investigate if using the data
> source v2 api makes sense and is feasible for some of these transformations
> to allow it to be easily extended.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]