[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:
----------------------------------------
    Description: 
*SPIP: Columnar Processing Without Arrow Formatting Guarantees.*

 

*Q1.* What are you trying to do? Articulate your objectives using absolutely no 
jargon.

The Dataset/DataFrame API in Spark currently only exposes to users one row at a 
time when processing data.  The goals of this are to
 # Add to the current sql extensions mechanism so advanced users can have 
access to the physical SparkPlan and manipulate it to provide columnar 
processing for existing operators, including shuffle.  This will allow them to 
implement their own cost based optimizers to decide when processing should be 
columnar and when it should not.
 # Make any transitions between the columnar memory layout and a row based 
layout transparent to the users so operations that are not columnar see the 
data as rows, and operations that are columnar see the data as columns.

 

Not Requirements, but things that would be nice to have.
 # Transition the existing in memory columnar layouts to be compatible with 
Apache Arrow.  This would make the transformations to Apache Arrow format a 
no-op. The existing formats are already very close to those layouts in many 
cases.  This would not be using the Apache Arrow java library, but instead 
being compatible with the memory 
[layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
subset of that layout.

 

*Q2.* What problem is this proposal NOT designed to solve? 

The goal of this is not for ML/AI but to provide APIs for accelerated computing 
in Spark primarily targeting SQL/ETL like workloads.  ML/AI already have 
several mechanisms to get data into/out of them. These can be improved but will 
be covered in a separate SPIP.

This is not trying to implement any of the processing itself in a columnar way, 
with the exception of examples for documentation.

This does not cover exposing the underlying format of the data.  The only way 
to get at the data in a ColumnVector is through the public APIs.  Exposing the 
underlying format to improve efficiency will be covered in a separate SPIP.

This is not trying to implement new ways of transferring data to external ML/AI 
applications.  That is covered by separate SPIPs already.

This is not trying to add in generic code generation for columnar processing.  
Currently code generation for columnar processing is only supported when 
translating columns to rows.  We will continue to support this, but will not 
extend it as a general solution. That will be covered in a separate SPIP if we 
find it is helpful.  For now columnar processing will be interpreted.

This is not trying to expose a way to get columnar data into Spark through 
DataSource V2 or any other similar API.  That would be covered by a separate 
SPIP if we find it is needed.

 

*Q3.* How is it done today, and what are the limits of current practice?

The current columnar support is limited to 3 areas.
 # Internal implementations of FileFormats, optionally can return a 
ColumnarBatch instead of rows.  The code generation phase knows how to take 
that columnar data and iterate through it as rows for stages that wants rows, 
which currently is almost everything.  The limitations here are mostly 
implementation specific. The current standard is to abuse Scala’s type erasure 
to return ColumnarBatches as the elements of an RDD[InternalRow]. The code 
generation can handle this because it is generating java code, so it bypasses 
scala’s type checking and just casts the InternalRow to the desired 
ColumnarBatch.  This makes it difficult for others to implement the same 
functionality for different processing because they can only do it through code 
generation. There really is no clean separate path in the code generation for 
columnar vs row based. Additionally, because it is only supported through code 
generation if for any reason code generation would fail there is no backup.  
This is typically fine for input formats but can be problematic when we get 
into more extensive processing.
 # When caching data it can optionally be cached in a columnar format if the 
input is also columnar.  This is similar to the first area and has the same 
limitations because the cache acts as an input, but it is the only piece of 
code that also consumes columnar data as an input.
 # Pandas vectorized processing.  To be able to support Pandas UDFs Spark will 
build up a batch of data and send it to python for processing, and then get a 
batch of data back as a result.  The format of the data being sent to python 
can either be pickle, which is the default, or optionally Arrow. The result 
returned is the same format. The limitations here really are around 
performance.  Transforming the data back and forth can be very expensive.

 

*Q4.* What is new in your approach and why do you think it will be successful?

What we are primarily doing is cleaning up a lot of existing functionality, 
refactoring it, and making it more generic. We think we can be successful 
because we have already completed a proof of concept that shows columnar 
processing can be efficiently done in Spark.

 

*Q5.* Who cares? If you are successful, what difference will it make?

Anyone who wants to accelerate spark.  At Spark+AI summit this year, 2019, I 
spoke with multiple companies (7 by my count including Facebook) trying to do 
this, either using FPGAs, GPUs, or CPU SIMD instructions to get faster more 
efficient processing.  This will help all of them to provide a clean 
implementation of accelerated ETL processing, without hacks like overriding 
internal spark classes by putting jars first on the classpath, which many of 
these companies are currently doing.

 

*Q6.* What are the risks?

Technologically I don’t see many risks.  We have done a proof of concept 
implementation that shows it can be done, it is just a matter of putting those 
changes in place.

 

*Q7.* How long will it take?

I suspect that we can put together a patch with tests in a month. Adding 
documentation and iterating on the APIs I would suspect would put it at a month 
and a half to two months. So one quarter would give us enough time to probably 
get through everything.

 

*Q8.* What are the mid-term and final “exams” to check for success?

The first check for success would be to successfully transition the existing 
columnar code over to using this behind the scenes. That would be separating 
out the code that goes from rows to columns when sending the data to python for 
processing by Pandas UDFs, and also the code that goes from columns to rows for 
the file formats, caching, and the output of Pandas UDFs.

 

The final success is really about adoption and seeing if the follow on work 
that we, and hopefully others, have shows that it cleanly enables something 
that was not possible before.

 

*Appendix A:*

For the most part the APIs will not need to change in any backwards 
incompatible way.  Also note that these are not necessarily final changes to 
the APIs, but mostly reflect the general direction that we want to go in, so 
there is no need to include nits in the discussion on the APIs.  Those can be 
covered by code reviews.

 

{{ColumnarBatch}} and {{ColumnVector}} will need to move from one jar to 
another to allow the catalyst Expression class to have access to them.

 

Expression will get added to it.

 
{code:java}
 /**
  * Returns true if this expression supports columnar processing through 
[[columnarEval]].
  */
 def supportsColumnar: Boolean = false

 /**
  * Returns the result of evaluating this expression on the entire 
ColumnarBatch. The result of
  * calling this may be a single ColumnVector, or a scalar value. Scalar values 
can happen if they
  * are a part of the expression or in some cases may be an optimization, like 
using the batch's
  * null count to know is isNull is false for the entire batch without doing 
any calculations.
  */
 def columnarEval(batch: ColumnarBatch): Any = {
   throw new IllegalStateException(s"Internal Error ${this.getClass} has column 
support mismatch")
 }
{code}
 

SparkPlan will be updated to include
{code:java}
 /**
  * Return true if this stage of the plan supports columnar execution.
  */
 def supportsColumnar: Boolean = false

 /**
  * The exact types of the columns that are output in columnar processing mode 
(used for faster codegen of transitions from columns to rows).
  */
 def vectorTypes: Option[Seq[String]] = None

 /**
  * Returns the result of this query as an RDD[ColumnarBatch] by delegating to 
`doColumnarExecute`
  * after preparations.
  *
  * Concrete implementations of SparkPlan should override `doColumnarExecute` 
if `supportsColumnar`
  * returns true.
  */
 final def executeColumnar(): RDD[ColumnarBatch] = executeQuery {
    if (isCanonicalizedPlan) {
        throw new IllegalStateException("A canonicalized plan is not supposed 
to be executed.")
    }
    doExecuteColumnar()
 }

 /**
  * Produces the result of the query as an `RDD[ColumnarBatch]` if 
[[supportsColumnar]] returns
  * true.
  */
 protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
    // If the user updates supportsColumnar, but not this blow up.
    throw new IllegalStateException(s"Internal Error ${this.getClass} has 
column support" +
    s" mismatch:\n${this}")
 }
{code}
In BufferedRowIterator init will change to reflect that the values in it could 
be ColumnarBatches too, which is the case today.
{code:java}
-  public abstract void init(int index, Iterator<InternalRow>[] iters);
+  public abstract void init(int index, Iterator<Object>[] iters);
{code}
 
 SparkSessionExtensions will have new APIs for columnar processing.
{code:java}
type ColumnarRuleBuilder = SparkSession => ColumnarRule

def injectColumnar(builder: ColumnarRuleBuilder): Unit

/**
 * :: Experimental ::
 * Holds a rule that run prior to inserting column to row and row to column 
transitions to
 * allow for injecting a columnar implementation into various operators, and a 
rule that
 * runs after to allow overriding the implementation of those transitions, and 
potentially
 * cleaning up the plan (like inserting batching of columnar data for more 
efficient processing).
 */
@DeveloperApi
@Experimental
@Unstable
class ColumnarRule {
  def pre(plan: SparkPlan): SparkPlan = plan
  def post(plan: SparkPlan): SparkPlan = plan
}
{code}

  was:
*Q1.* What are you trying to do? Articulate your objectives using absolutely no 
jargon.

 

The Dataset/DataFrame API in Spark currently only exposes to users one row at a 
time when processing data.  The goals of this are to 

 
 # Expose to end users a new option of processing the data in a columnar 
format, multiple rows at a time, with the data organized into contiguous arrays 
in memory. 
 # Make any transitions between the columnar memory layout and a row based 
layout transparent to the end user.
 # Allow for simple data exchange with other systems, DL/ML libraries, pandas, 
etc. by having clean APIs to transform the columnar data into an Apache Arrow 
compatible layout.
 # Provide a plugin mechanism for columnar processing support so an advanced 
user could avoid data transition between columnar and row based processing even 
through shuffles. This means we should at least support pluggable APIs so an 
advanced end user can implement the columnar partitioning themselves, and 
provide the glue necessary to shuffle the data still in a columnar format.
 # Expose new APIs that allow advanced users or frameworks to implement 
columnar processing either as UDFs, or by adjusting the physical plan to do 
columnar processing.  If the latter is too controversial we can move it to 
another SPIP, but we plan to implement some accelerated computing in parallel 
with this feature to be sure the APIs work, and without this feature it makes 
that impossible.

 

Not Requirements, but things that would be nice to have.
 # Provide default implementations for partitioning columnar data, so users 
don’t have to.
 # Transition the existing in memory columnar layouts to be compatible with 
Apache Arrow.  This would make the transformations to Apache Arrow format a 
no-op. The existing formats are already very close to those layouts in many 
cases.  This would not be using the Apache Arrow java library, but instead 
being compatible with the memory 
[layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
subset of that layout.
 # Provide a clean transition from the existing code to the new one.  The 
existing APIs which are public but evolving are not that far off from what is 
being proposed.  We should be able to create a new parallel API that can wrap 
the existing one. This means any file format that is trying to support columnar 
can still do so until we make a conscious decision to deprecate and then turn 
off the old APIs.

 

*Q2.* What problem is this proposal NOT designed to solve?

This is not trying to implement any of the processing itself in a columnar way, 
with the exception of examples for documentation, and possibly default 
implementations for partitioning of columnar shuffle. 

 

*Q3.* How is it done today, and what are the limits of current practice?

The current columnar support is limited to 3 areas.
 # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
code generation phase knows how to take that columnar data and iterate through 
it as rows for stages that wants rows, which currently is almost everything.  
The limitations here are mostly implementation specific. The current standard 
is to abuse Scala’s type erasure to return ColumnarBatches as the elements of 
an RDD[InternalRow]. The code generation can handle this because it is 
generating java code, so it bypasses scala’s type checking and just casts the 
InternalRow to the desired ColumnarBatch.  This makes it difficult for others 
to implement the same functionality for different processing because they can 
only do it through code generation. There really is no clean separate path in 
the code generation for columnar vs row based. Additionally because it is only 
supported through code generation if for any reason code generation would fail 
there is no backup.  This is typically fine for input formats but can be 
problematic when we get into more extensive processing. 
 # When caching data it can optionally be cached in a columnar format if the 
input is also columnar.  This is similar to the first area and has the same 
limitations because the cache acts as an input, but it is the only piece of 
code that also consumes columnar data as an input.
 # Pandas vectorized processing.  To be able to support Pandas UDFs Spark will 
build up a batch of data and send it python for processing, and then get a 
batch of data back as a result.  The format of the data being sent to python 
can either be pickle, which is the default, or optionally Arrow. The result 
returned is the same format. The limitations here really are around 
performance.  Transforming the data back and forth can be very expensive.

 

*Q4.* What is new in your approach and why do you think it will be successful?

Conceptually what we are primarily doing is cleaning up a lot of existing 
functionality and getting it ready to be exposed as public facing APIs. We 
think we can be successful because we have already completed a proof of concept 
that shows columnar processing can be efficiently done in Spark.

 

*Q5.* Who cares? If you are successful, what difference will it make?

Anyone who wants to integrate Spark with other data processing systems will 
care, as it provides a cleaner, more standards-based way of transferring the 
data.  Anyone who wants to experiment with accelerated computing, either on a 
CPU or GPU, will benefit as it provides a supported way to make this work 
instead of trying to hack something in that was not available before.

 

*Q6.* What are the risks?

 

Technologically I don’t see many risks.  We have done a proof of concept 
implementation that shows it can be done.  This biggest risks are if Apache 
Arrow loses favor as an interchange format between different systems.  In the 
worst cast that means we may have to support transforming the data into other 
types, but because Arrow as the internal format is not a requirement and there 
really are not that many ways to lay out columnar data, it should require minor 
changes if anything. 

 

The second risk is if we didn’t setup the API in a powerful/flexible enough way 
for users to really take full advantage of it.  This is really going to come 
with time, and if we start off fairly conservatively in our API we can 
hopefully expand it in the future.

 

The final risk is around dictionaries. The current columnar layout exposes the 
dictionary as just a java class.  When we go to an Arrow compatible layout the 
dictionary has a specific format. The Parquet APIs don’t give raw access to the 
layout of the dictionary.  This is not something that we cannot overcome, it 
just means we may have less than optimal translations, in terms of performance, 
when going from Parquet formatted data to Arrow until we can get some changes 
into the Parquet API.  This may be true of Orc as well.  It is no worse off 
than it is today already.


 

*Q7.* How long will it take?

I suspect that we can put together a patch with tests in a month. Adding 
documentation and iterating on the APIs I would suspect would put it at a month 
and a half to two months. So one quarter would give us enough time to get 
through everything, including reviews if things go well.

 

*Q8.* What are the mid-term and final “exams” to check for success?

The first check for success would be to successfully transition the existing 
columnar code over to using this.  That would be transforming and sending the 
data to python for processing by Pandas UDFs, and the file formats and caching 
code to use a cleaner more explicit columnar API. 

 

The final success is really about adoption and seeing if the follow on work 
that we, and hopefully others, have shows that it cleanly enables something 
that was not possible before.


> SPIP: Public APIs for extended Columnar Processing Support
> ----------------------------------------------------------
>
>                 Key: SPARK-27396
>                 URL: https://issues.apache.org/jira/browse/SPARK-27396
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Robert Joseph Evans
>            Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data.  The only way 
> to get at the data in a ColumnVector is through the public APIs.  Exposing 
> the underlying format to improve efficiency will be covered in a separate 
> SPIP.
> This is not trying to implement new ways of transferring data to external 
> ML/AI applications.  That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing. 
>  Currently code generation for columnar processing is only supported when 
> translating columns to rows.  We will continue to support this, but will not 
> extend it as a general solution. That will be covered in a separate SPIP if 
> we find it is helpful.  For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through 
> DataSource V2 or any other similar API.  That would be covered by a separate 
> SPIP if we find it is needed.
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Internal implementations of FileFormats, optionally can return a 
> ColumnarBatch instead of rows.  The code generation phase knows how to take 
> that columnar data and iterate through it as rows for stages that wants rows, 
> which currently is almost everything.  The limitations here are mostly 
> implementation specific. The current standard is to abuse Scala’s type 
> erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The 
> code generation can handle this because it is generating java code, so it 
> bypasses scala’s type checking and just casts the InternalRow to the desired 
> ColumnarBatch.  This makes it difficult for others to implement the same 
> functionality for different processing because they can only do it through 
> code generation. There really is no clean separate path in the code 
> generation for columnar vs row based. Additionally, because it is only 
> supported through code generation if for any reason code generation would 
> fail there is no backup.  This is typically fine for input formats but can be 
> problematic when we get into more extensive processing.
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the cache acts as an input, but it is the only piece of 
> code that also consumes columnar data as an input.
>  # Pandas vectorized processing.  To be able to support Pandas UDFs Spark 
> will build up a batch of data and send it to python for processing, and then 
> get a batch of data back as a result.  The format of the data being sent to 
> python can either be pickle, which is the default, or optionally Arrow. The 
> result returned is the same format. The limitations here really are around 
> performance.  Transforming the data back and forth can be very expensive.
>  
> *Q4.* What is new in your approach and why do you think it will be successful?
> What we are primarily doing is cleaning up a lot of existing functionality, 
> refactoring it, and making it more generic. We think we can be successful 
> because we have already completed a proof of concept that shows columnar 
> processing can be efficiently done in Spark.
>  
> *Q5.* Who cares? If you are successful, what difference will it make?
> Anyone who wants to accelerate spark.  At Spark+AI summit this year, 2019, I 
> spoke with multiple companies (7 by my count including Facebook) trying to do 
> this, either using FPGAs, GPUs, or CPU SIMD instructions to get faster more 
> efficient processing.  This will help all of them to provide a clean 
> implementation of accelerated ETL processing, without hacks like overriding 
> internal spark classes by putting jars first on the classpath, which many of 
> these companies are currently doing.
>  
> *Q6.* What are the risks?
> Technologically I don’t see many risks.  We have done a proof of concept 
> implementation that shows it can be done, it is just a matter of putting 
> those changes in place.
>  
> *Q7.* How long will it take?
> I suspect that we can put together a patch with tests in a month. Adding 
> documentation and iterating on the APIs I would suspect would put it at a 
> month and a half to two months. So one quarter would give us enough time to 
> probably get through everything.
>  
> *Q8.* What are the mid-term and final “exams” to check for success?
> The first check for success would be to successfully transition the existing 
> columnar code over to using this behind the scenes. That would be separating 
> out the code that goes from rows to columns when sending the data to python 
> for processing by Pandas UDFs, and also the code that goes from columns to 
> rows for the file formats, caching, and the output of Pandas UDFs.
>  
> The final success is really about adoption and seeing if the follow on work 
> that we, and hopefully others, have shows that it cleanly enables something 
> that was not possible before.
>  
> *Appendix A:*
> For the most part the APIs will not need to change in any backwards 
> incompatible way.  Also note that these are not necessarily final changes to 
> the APIs, but mostly reflect the general direction that we want to go in, so 
> there is no need to include nits in the discussion on the APIs.  Those can be 
> covered by code reviews.
>  
> {{ColumnarBatch}} and {{ColumnVector}} will need to move from one jar to 
> another to allow the catalyst Expression class to have access to them.
>  
> Expression will get added to it.
>  
> {code:java}
>  /**
>   * Returns true if this expression supports columnar processing through 
> [[columnarEval]].
>   */
>  def supportsColumnar: Boolean = false
>  /**
>   * Returns the result of evaluating this expression on the entire 
> ColumnarBatch. The result of
>   * calling this may be a single ColumnVector, or a scalar value. Scalar 
> values can happen if they
>   * are a part of the expression or in some cases may be an optimization, 
> like using the batch's
>   * null count to know is isNull is false for the entire batch without doing 
> any calculations.
>   */
>  def columnarEval(batch: ColumnarBatch): Any = {
>    throw new IllegalStateException(s"Internal Error ${this.getClass} has 
> column support mismatch")
>  }
> {code}
>  
> SparkPlan will be updated to include
> {code:java}
>  /**
>   * Return true if this stage of the plan supports columnar execution.
>   */
>  def supportsColumnar: Boolean = false
>  /**
>   * The exact types of the columns that are output in columnar processing 
> mode (used for faster codegen of transitions from columns to rows).
>   */
>  def vectorTypes: Option[Seq[String]] = None
>  /**
>   * Returns the result of this query as an RDD[ColumnarBatch] by delegating 
> to `doColumnarExecute`
>   * after preparations.
>   *
>   * Concrete implementations of SparkPlan should override `doColumnarExecute` 
> if `supportsColumnar`
>   * returns true.
>   */
>  final def executeColumnar(): RDD[ColumnarBatch] = executeQuery {
>     if (isCanonicalizedPlan) {
>         throw new IllegalStateException("A canonicalized plan is not supposed 
> to be executed.")
>     }
>     doExecuteColumnar()
>  }
>  /**
>   * Produces the result of the query as an `RDD[ColumnarBatch]` if 
> [[supportsColumnar]] returns
>   * true.
>   */
>  protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
>     // If the user updates supportsColumnar, but not this blow up.
>     throw new IllegalStateException(s"Internal Error ${this.getClass} has 
> column support" +
>     s" mismatch:\n${this}")
>  }
> {code}
> In BufferedRowIterator init will change to reflect that the values in it 
> could be ColumnarBatches too, which is the case today.
> {code:java}
> -  public abstract void init(int index, Iterator<InternalRow>[] iters);
> +  public abstract void init(int index, Iterator<Object>[] iters);
> {code}
>  
>  SparkSessionExtensions will have new APIs for columnar processing.
> {code:java}
> type ColumnarRuleBuilder = SparkSession => ColumnarRule
> def injectColumnar(builder: ColumnarRuleBuilder): Unit
> /**
>  * :: Experimental ::
>  * Holds a rule that run prior to inserting column to row and row to column 
> transitions to
>  * allow for injecting a columnar implementation into various operators, and 
> a rule that
>  * runs after to allow overriding the implementation of those transitions, 
> and potentially
>  * cleaning up the plan (like inserting batching of columnar data for more 
> efficient processing).
>  */
> @DeveloperApi
> @Experimental
> @Unstable
> class ColumnarRule {
>   def pre(plan: SparkPlan): SparkPlan = plan
>   def post(plan: SparkPlan): SparkPlan = plan
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to