[ 
https://issues.apache.org/jira/browse/SPARK-57468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongze Zhang updated SPARK-57468:
---------------------------------
    Description: 
h3. Summary

This is a proposal to extend Spark's SQL query plan API to support custom 
ColumnarBatch types.
h3. Background

Currently, Spark has 3 different built-in ColumnarBatch types: on-heap, 
off-heap, and Arrow. At the same time, there are many Spark SQL plugin projects 
that have their own ColumnarBatch implementations. However, Spark is unable to 
identify and handle them well during the query planning phase.

For example, Spark's `RowToColumnarExec`[1] can only convert InternalRow to 
Spark's builtin column vectors, making rule 
`ApplyColumnarRulesAndInsertTransitions` useless for project like Apache Gluten 
on adding row-to-columnar transitions, because the project relies on custom 
vector types.

For another example, Spark doesn't have `ColumnarToColumnar` transition to 
convert vectorized data among different types. E.g., {*}between Spark Parquet / 
Orc vectors and Arrow vectors{*}, or custom vector types if 3rd plugin is used.
h3. Goal

The goal is to extend Spark's capability on handling multiple / custom 
ColumnarBatch types, to empower the downstream SQL plugins, making them much 
easier to integrate their own vectorized data layouts with Apache Spark. 
Meanwhile, making them possible to work together (e.g., Gluten + Lance) under 
the same standardized specification, without substantial design conflicts.
h3. The Change

To achieve the above goal, the proposed solution is to add a `convention` API 
to `SparkPlan`, consisting of `RowType` / `BatchType` to let the query planner 
know the concrete data layout that this plan node consumes / produces.

The shape can be very similar to the `GlutenPlan` API[2] in Apache Gluten, but 
optimized and modified towards Spark's code base.

`RowType` / `BatchType` can be registered along with their supported 
transitions, e.g.,
{code:scala}
object VeloxBatchType extends Convention.BatchType {
  override protected def registerTransitions(): Unit = {
    fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
    toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
  }
}
{code}
All registered transitions will become a graph, which is being searched in 
`ApplyColumnarRulesAndInsertTransitions`, to determine the final transition 
combinations inserted in between adjacent query plan nodes.

The API should be treated stable as soon as possible after a few development 
cycles.
h3. Test / Integration

The API should be tested solely in Spark's code with mocked RowType / 
ColumnarTypes. Apache Gluten can integrate with this API timely when it's 
ready. We will also work with other SQL plugins to propose essential changes to 
integrate.

[1] 
[https://github.com/apache/spark/blob/8cd1be4fb380429dec536da2629ba88b6ebc39d3/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L494-L511]
[2] 
[https://github.com/apache/gluten/blob/2dcee79e3105607bbc83af4e98e08531dc3fce59/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala#L45-L79]

  was:
h3. Summary

This is a proposal to extend Spark's SQL query plan API to support custom 
ColumnarBatch types.
h3. Background

Currently, Spark has 3 different built-in ColumnarBatch types: on-heap, 
off-heap, and Arrow. At the same time, there are many Spark SQL plugin projects 
that have their own ColumnarBatch implementation. However, Spark is unable to 
identify and handle them well during the query planning phase.

For example, Spark's `RowToColumnarExec`[1] can only convert InternalRow to 
Spark's builtin column vectors, making rule 
`ApplyColumnarRulesAndInsertTransitions` useless for project like Apache Gluten 
on adding row-to-columnar transitions, because the project relies on custom 
vector types.

For another example, Spark doesn't have `ColumnarToColumnar` transition to 
convert vectorized data among different types. E.g., {*}between Spark Parquet / 
Orc vectors and Arrow vectors{*}, or custom vector types if 3rd plugin is used.
h3. Goal

The goal is to extend Spark's capability on handling multiple / custom 
ColumnarBatch types, to empower the downstream SQL plugins, making them much 
easier to integrate their own vectorized data layouts with Apache Spark. 
Meanwhile, making them possible to work together (e.g., Gluten + Lance) under 
the same standardized specification, without substantial design conflicts.
h3. The Change

To achieve the above goal, the proposed solution is to add a `convention` API 
to `SparkPlan`, consisting of `RowType` / `BatchType` to let the query planner 
know the concrete data layout that this plan node consumes / produces.

The shape can be very similar to the `GlutenPlan` API[2] in Apache Gluten, but 
optimized and modified towards Spark's code base.

`RowType` / `BatchType` can be registered along with their supported 
transitions, e.g.,
{code:scala}
object VeloxBatchType extends Convention.BatchType {
  override protected def registerTransitions(): Unit = {
    fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
    toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
  }
}
{code}
All registered transitions will become a graph, which is being searched in 
`ApplyColumnarRulesAndInsertTransitions`, to determine the final transition 
combinations inserted in between adjacent query plan nodes.

The API should be treated stable as soon as possible after a few development 
cycles.
h3. Test / Integration

The API should be tested solely in Spark's code with mocked RowType / 
ColumnarTypes. Apache Gluten can integrate with this API timely when it's 
ready. We will also work with other SQL plugins to propose essential changes to 
integrate.

[1] 
[https://github.com/apache/spark/blob/8cd1be4fb380429dec536da2629ba88b6ebc39d3/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L494-L511]
[2] 
[https://github.com/apache/gluten/blob/2dcee79e3105607bbc83af4e98e08531dc3fce59/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala#L45-L79]


> Extend SQL query plan API to support custom ColumnarBatch types
> ---------------------------------------------------------------
>
>                 Key: SPARK-57468
>                 URL: https://issues.apache.org/jira/browse/SPARK-57468
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 4.3.0
>            Reporter: Hongze Zhang
>            Priority: Major
>
> h3. Summary
> This is a proposal to extend Spark's SQL query plan API to support custom 
> ColumnarBatch types.
> h3. Background
> Currently, Spark has 3 different built-in ColumnarBatch types: on-heap, 
> off-heap, and Arrow. At the same time, there are many Spark SQL plugin 
> projects that have their own ColumnarBatch implementations. However, Spark is 
> unable to identify and handle them well during the query planning phase.
> For example, Spark's `RowToColumnarExec`[1] can only convert InternalRow to 
> Spark's builtin column vectors, making rule 
> `ApplyColumnarRulesAndInsertTransitions` useless for project like Apache 
> Gluten on adding row-to-columnar transitions, because the project relies on 
> custom vector types.
> For another example, Spark doesn't have `ColumnarToColumnar` transition to 
> convert vectorized data among different types. E.g., {*}between Spark Parquet 
> / Orc vectors and Arrow vectors{*}, or custom vector types if 3rd plugin is 
> used.
> h3. Goal
> The goal is to extend Spark's capability on handling multiple / custom 
> ColumnarBatch types, to empower the downstream SQL plugins, making them much 
> easier to integrate their own vectorized data layouts with Apache Spark. 
> Meanwhile, making them possible to work together (e.g., Gluten + Lance) under 
> the same standardized specification, without substantial design conflicts.
> h3. The Change
> To achieve the above goal, the proposed solution is to add a `convention` API 
> to `SparkPlan`, consisting of `RowType` / `BatchType` to let the query 
> planner know the concrete data layout that this plan node consumes / produces.
> The shape can be very similar to the `GlutenPlan` API[2] in Apache Gluten, 
> but optimized and modified towards Spark's code base.
> `RowType` / `BatchType` can be registered along with their supported 
> transitions, e.g.,
> {code:scala}
> object VeloxBatchType extends Convention.BatchType {
>   override protected def registerTransitions(): Unit = {
>     fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
>     toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
>   }
> }
> {code}
> All registered transitions will become a graph, which is being searched in 
> `ApplyColumnarRulesAndInsertTransitions`, to determine the final transition 
> combinations inserted in between adjacent query plan nodes.
> The API should be treated stable as soon as possible after a few development 
> cycles.
> h3. Test / Integration
> The API should be tested solely in Spark's code with mocked RowType / 
> ColumnarTypes. Apache Gluten can integrate with this API timely when it's 
> ready. We will also work with other SQL plugins to propose essential changes 
> to integrate.
> [1] 
> [https://github.com/apache/spark/blob/8cd1be4fb380429dec536da2629ba88b6ebc39d3/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L494-L511]
> [2] 
> [https://github.com/apache/gluten/blob/2dcee79e3105607bbc83af4e98e08531dc3fce59/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala#L45-L79]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to