[
https://issues.apache.org/jira/browse/SPARK-57468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hongze Zhang updated SPARK-57468:
---------------------------------
Description:
h3. Summary
This is a proposal to extend Spark's SQL query plan API to support custom
ColumnarBatch types.
h3. Background
Currently, there are many Spark SQL plugin projects that have their own
implementation of Spark ColumnarBatch. However, Spark is unable to identify and
handle them well during the query planning phase.
For example, Spark's `RowToColumnarExec`[1] can only convert InternalRow to
Spark's builtin column vectors, making rule
`ApplyColumnarRulesAndInsertTransitions` useless for project like Apache Gluten
on adding row-to-columnar transitions, because the project relies on custom
vector types.
For another example, Spark doesn't have `ColumnarToColumnar` transition to
convert vectorized data among different types. E.g., {*}between Spark Parquet /
Orc vectors and Arrow vectors{*}, or custom vector types if 3rd plugin is used.
h3. Goal
The goal is to extend Spark's capability on handling multiple / custom
ColumnarBatch types, to empower the downstream SQL plugins, making them much
easier to integrate their own vectorized data layouts with Apache Spark.
Meanwhile, making them possible to work together (e.g., Gluten + Lance) under
the same standardized specification, without substantial design conflicts.
h3. The Change
To achieve the above goal, the proposed solution is to add a `convention` API
to `SparkPlan`, consisting of `RowType` / `BatchType` to let the query planner
know the concrete data layout that this plan node consumes / produces.
The shape can be very similar to the `GlutenPlan` API[2] in Apache Gluten, but
optimized and modified towards Spark's code base.
`RowType` / `BatchType` can be registered along with their supported
transitions, e.g.,
{code:scala}
object VeloxBatchType extends Convention.BatchType {
override protected def registerTransitions(): Unit = {
fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
}
}
{code}
All registered transitions will become a graph, which is being searched in
`ApplyColumnarRulesAndInsertTransitions`, to determine the final transition
combinations inserted in between adjacent query plan nodes.
The API should be treated stable as soon as possible after a few development
cycles.
h3. Test / Integration
The API should be tested solely in Spark's code with mocked RowType /
ColumnarTypes. Apache Gluten can integrate with this API timely when it's
ready. We will also work with other SQL plugins to propose essential changes to
integrate.
[1]
[https://github.com/apache/spark/blob/8cd1be4fb380429dec536da2629ba88b6ebc39d3/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L494-L511]
[2]
[https://github.com/apache/gluten/blob/2dcee79e3105607bbc83af4e98e08531dc3fce59/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala#L45-L79]
was:
h3. Summary
This is a proposal to extend Spark's SQL query plan API to support custom
ColumnarBatch types.
h3. Background
Currently, there are many Spark SQL plugin projects that have their own
implementation of Spark ColumnarBatch. However, Spark is unable to identify and
handle them well during the query planning phase.
For example, Spark's `RowToColumnarExec`[1] can only convert InternalRow to
Spark's builtin column vectors, making rule
`ApplyColumnarRulesAndInsertTransitions` useless for project like Apache Gluten
on adding row-to-columnar transitions, because the project relies on custom
vector types.
For another example, Spark doesn't have `ColumnarToColumnar` transition to
convert vectorized data among different types. E.g., between Parquet / Orc
vectors and Arrow vectors, or custom vector types if 3rd plugin is used.
h3. Goal
The goal is to extend Spark's capability on handling multiple / custom
ColumnarBatch types, to empower the downstream SQL plugins, making them much
easier to integrate their own vectorized data layouts with Apache Spark.
Meanwhile, making them possible to work together (e.g., Gluten + Lance) under
the same standardized specification, without substantial design conflicts.
h3. The Change
To achieve the above goal, the proposed solution is to add a `convention` API
to `SparkPlan`, consisting of `RowType` / `BatchType` to let the query planner
know the concrete data layout that this plan node consumes / produces.
The shape can be very similar to the `GlutenPlan` API[2] in Apache Gluten, but
optimized and modified towards Spark's code base.
`RowType` / `BatchType` can be registered along with their supported
transitions, e.g.,
{code:scala}
object VeloxBatchType extends Convention.BatchType {
override protected def registerTransitions(): Unit = {
fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
}
}
{code}
All registered transitions will become a graph, which is being searched in
`ApplyColumnarRulesAndInsertTransitions`, to determine the final transition
combinations inserted in between adjacent query plan nodes.
The API should be treated stable as soon as possible after a few development
cycles.
h3. Test / Integration
The API should be tested solely in Spark's code with mocked RowType /
ColumnarTypes. Apache Gluten can integrate with this API timely when it's
ready. We will also work with other SQL plugins to propose essential changes to
integrate.
[1]
[https://github.com/apache/spark/blob/8cd1be4fb380429dec536da2629ba88b6ebc39d3/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L494-L511]
[2]
[https://github.com/apache/gluten/blob/2dcee79e3105607bbc83af4e98e08531dc3fce59/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala#L45-L79]
> Extend SQL query plan API to support custom ColumnarBatch types
> ---------------------------------------------------------------
>
> Key: SPARK-57468
> URL: https://issues.apache.org/jira/browse/SPARK-57468
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.1.2
> Reporter: Hongze Zhang
> Priority: Major
>
> h3. Summary
> This is a proposal to extend Spark's SQL query plan API to support custom
> ColumnarBatch types.
> h3. Background
> Currently, there are many Spark SQL plugin projects that have their own
> implementation of Spark ColumnarBatch. However, Spark is unable to identify
> and handle them well during the query planning phase.
> For example, Spark's `RowToColumnarExec`[1] can only convert InternalRow to
> Spark's builtin column vectors, making rule
> `ApplyColumnarRulesAndInsertTransitions` useless for project like Apache
> Gluten on adding row-to-columnar transitions, because the project relies on
> custom vector types.
> For another example, Spark doesn't have `ColumnarToColumnar` transition to
> convert vectorized data among different types. E.g., {*}between Spark Parquet
> / Orc vectors and Arrow vectors{*}, or custom vector types if 3rd plugin is
> used.
> h3. Goal
> The goal is to extend Spark's capability on handling multiple / custom
> ColumnarBatch types, to empower the downstream SQL plugins, making them much
> easier to integrate their own vectorized data layouts with Apache Spark.
> Meanwhile, making them possible to work together (e.g., Gluten + Lance) under
> the same standardized specification, without substantial design conflicts.
> h3. The Change
> To achieve the above goal, the proposed solution is to add a `convention` API
> to `SparkPlan`, consisting of `RowType` / `BatchType` to let the query
> planner know the concrete data layout that this plan node consumes / produces.
> The shape can be very similar to the `GlutenPlan` API[2] in Apache Gluten,
> but optimized and modified towards Spark's code base.
> `RowType` / `BatchType` can be registered along with their supported
> transitions, e.g.,
> {code:scala}
> object VeloxBatchType extends Convention.BatchType {
> override protected def registerTransitions(): Unit = {
> fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
> toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
> }
> }
> {code}
> All registered transitions will become a graph, which is being searched in
> `ApplyColumnarRulesAndInsertTransitions`, to determine the final transition
> combinations inserted in between adjacent query plan nodes.
> The API should be treated stable as soon as possible after a few development
> cycles.
> h3. Test / Integration
> The API should be tested solely in Spark's code with mocked RowType /
> ColumnarTypes. Apache Gluten can integrate with this API timely when it's
> ready. We will also work with other SQL plugins to propose essential changes
> to integrate.
> [1]
> [https://github.com/apache/spark/blob/8cd1be4fb380429dec536da2629ba88b6ebc39d3/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L494-L511]
> [2]
> [https://github.com/apache/gluten/blob/2dcee79e3105607bbc83af4e98e08531dc3fce59/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala#L45-L79]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]