Hongze Zhang created SPARK-57468:
------------------------------------
Summary: Extend SQL query plan API to support custom ColumnarBatch
types
Key: SPARK-57468
URL: https://issues.apache.org/jira/browse/SPARK-57468
Project: Spark
Issue Type: New Feature
Components: SQL
Affects Versions: 4.1.2
Reporter: Hongze Zhang
h3. Summary
This is a proposal to extend Spark's SQL query plan API to support custom
ColumnarBatch types.
h3. Background
Currently, there are many Spark SQL plugin projects have their own
implementation of Spark ColumnarBatch. However, Spark is unable to identify and
handle them well during the query planning phase.
For example, Spark's `RowToColumnarExec`[1] can only convert InternalRow to
Spark's builtin column vectors, making rule
`ApplyColumnarRulesAndInsertTransitions` useless for project like Apache Gluten
on adding row-to-columnar transitions, because the project relies on custom
vector types.
For another example, Spark doesn't have `ColumnarToColumnar` transition to
convert vectorized data among different types. E.g., between Parquet / Orc
vectors and Arrow vectors, or custom vector types if 3rd plugin is used.
h3. Goal
The goal is to extend Spark's capability on handling multiple / custom
ColumnarBatch types, to empower the downstream SQL plugins, making them much
easier to integrate their own vectorized data layouts with Apache Spark.
Meanwhile, making them possible to work together (e.g., Gluten + Lance) under
the same standardized specification, without substantial design conflicts.
h3. The Change
To achieve the above goal, the proposed solution is to add a `convention` API
to `SparkPlan`, consisting of `RowType` / `BatchType` to let the query planner
know the concrete data layout that this plan node consumes / produces.
The shape can be very similar to the `GlutenPlan` API[2] in Apache Gluten, but
optimized and modified towards backward Spark's code base.
`RowType` / `BatchType` can be registered along with their supported
transitions, e.g.,
{code:scala}
object VeloxBatchType extends Convention.BatchType {
override protected def registerTransitions(): Unit = {
fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
}
}
{code}
All registered transitions will become a graph, which is being searched in
`ApplyColumnarRulesAndInsertTransitions`, to determine the final transition
combinations inserted in between adjacent query plan nodes.
The API should be treated stable as soon as possible after a few development
cycles.
h3. Test / Integration
The API should be tested solely in Spark's code with mocked RowType /
ColumnarTypes. Apache Gluten can integrate with this API timely when it's
ready. We will also work with other SQL plugins to propose essential changes to
integrate.
[1]
https://github.com/apache/spark/blob/8cd1be4fb380429dec536da2629ba88b6ebc39d3/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L494-L511
[2]
https://github.com/apache/gluten/blob/2dcee79e3105607bbc83af4e98e08531dc3fce59/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala#L45-L79
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]