[
https://issues.apache.org/jira/browse/SPARK-39145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Erik Krogen resolved SPARK-39145.
---------------------------------
Resolution: Duplicate
Closing this as a duplicate..
> CLONE - SPIP: Public APIs for extended Columnar Processing Support
> ------------------------------------------------------------------
>
> Key: SPARK-39145
> URL: https://issues.apache.org/jira/browse/SPARK-39145
> Project: Spark
> Issue Type: Epic
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Abhi Shah
> Assignee: Robert Joseph Evans
> Priority: Major
>
> *strong text**SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at
> a time when processing data. The goals of this are to
> # Add to the current sql extensions mechanism so advanced users can have
> access to the physical SparkPlan and manipulate it to provide columnar
> processing for existing operators, including shuffle. This will allow them
> to implement their own cost based optimizers to decide when processing should
> be columnar and when it should not.
> # Make any transitions between the columnar memory layout and a row based
> layout transparent to the users so operations that are not columnar see the
> data as rows, and operations that are columnar see the data as columns.
>
> Not Requirements, but things that would be nice to have.
> # Transition the existing in memory columnar layouts to be compatible with
> Apache Arrow. This would make the transformations to Apache Arrow format a
> no-op. The existing formats are already very close to those layouts in many
> cases. This would not be using the Apache Arrow java library, but instead
> being compatible with the memory
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a
> subset of that layout.
>
> *Q2.* What problem is this proposal NOT designed to solve?
> The goal of this is not for ML/AI but to provide APIs for accelerated
> computing in Spark primarily targeting SQL/ETL like workloads. ML/AI already
> have several mechanisms to get data into/out of them. These can be improved
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data. The only way
> to get at the data in a ColumnVector is through the public APIs. Exposing
> the underlying format to improve efficiency will be covered in a separate
> SPIP.
> This is not trying to implement new ways of transferring data to external
> ML/AI applications. That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing.
> Currently code generation for columnar processing is only supported when
> translating columns to rows. We will continue to support this, but will not
> extend it as a general solution. That will be covered in a separate SPIP if
> we find it is helpful. For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through
> DataSource V2 or any other similar API. That would be covered by a separate
> SPIP if we find it is needed.
>
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
> # Internal implementations of FileFormats, optionally can return a
> ColumnarBatch instead of rows. The code generation phase knows how to take
> that columnar data and iterate through it as rows for stages that wants rows,
> which currently is almost everything. The limitations here are mostly
> implementation specific. The current standard is to abuse Scala’s type
> erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The
> code generation can handle this because it is generating java code, so it
> bypasses scala’s type checking and just casts the InternalRow to the desired
> ColumnarBatch. This makes it difficult for others to implement the same
> functionality for different processing because they can only do it through
> code generation. There really is no clean separate path in the code
> generation for columnar vs row based. Additionally, because it is only
> supported through code generation if for any reason code generation would
> fail there is no backup. This is typically fine for input formats but can be
> problematic when we get into more extensive processing.
> # When caching data it can optionally be cached in a columnar format if the
> input is also columnar. This is similar to the first area and has the same
> limitations because the cache acts as an input, but it is the only piece of
> code that also consumes columnar data as an input.
> # Pandas vectorized processing. To be able to support Pandas UDFs Spark
> will build up a batch of data and send it to python for processing, and then
> get a batch of data back as a result. The format of the data being sent to
> python can either be pickle, which is the default, or optionally Arrow. The
> result returned is the same format. The limitations here really are around
> performance. Transforming the data back and forth can be very expensive.
>
> *Q4.* What is new in your approach and why do you think it will be successful?
> What we are primarily doing is cleaning up a lot of existing functionality,
> refactoring it, and making it more generic. We think we can be successful
> because we have already completed a proof of concept that shows columnar
> processing can be efficiently done in Spark.
>
> *Q5.* Who cares? If you are successful, what difference will it make?
> Anyone who wants to accelerate spark. At Spark+AI summit this year, 2019, I
> spoke with multiple companies (7 by my count including Facebook) trying to do
> this, either using FPGAs, GPUs, or CPU SIMD instructions to get faster more
> efficient processing. This will help all of them to provide a clean
> implementation of accelerated ETL processing, without hacks like overriding
> internal spark classes by putting jars first on the classpath, which many of
> these companies are currently doing.
>
> *Q6.* What are the risks?
> Technologically I don’t see many risks. We have done a proof of concept
> implementation that shows it can be done, it is just a matter of putting
> those changes in place.
>
> *Q7.* How long will it take?
> I suspect that we can put together a patch with tests in a month. Adding
> documentation and iterating on the APIs I would suspect would put it at a
> month and a half to two months. So one quarter would give us enough time to
> probably get through everything.
>
> *Q8.* What are the mid-term and final “exams” to check for success?
> The first check for success would be to successfully transition the existing
> columnar code over to using this behind the scenes. That would be separating
> out the code that goes from rows to columns when sending the data to python
> for processing by Pandas UDFs, and also the code that goes from columns to
> rows for the file formats, caching, and the output of Pandas UDFs.
>
> The final success is really about adoption and seeing if the follow on work
> that we, and hopefully others, have shows that it cleanly enables something
> that was not possible before.
>
> *Appendix A:*
> For the most part the APIs will not need to change in any backwards
> incompatible way. Also note that these are not necessarily final changes to
> the APIs, but mostly reflect the general direction that we want to go in, so
> there is no need to include nits in the discussion on the APIs. Those can be
> covered by code reviews.
>
> {{ColumnarBatch}} and {{ColumnVector}} will need to move from one jar to
> another to allow the catalyst Expression class to have access to them.
>
> Expression will get added to it.
>
> {code:java}
> /**
> * Returns true if this expression supports columnar processing through
> [[columnarEval]].
> */
> def supportsColumnar: Boolean = false
> /**
> * Returns the result of evaluating this expression on the entire
> ColumnarBatch. The result of
> * calling this may be a single ColumnVector, or a scalar value. Scalar
> values can happen if they
> * are a part of the expression or in some cases may be an optimization,
> like using the batch's
> * null count to know is isNull is false for the entire batch without doing
> any calculations.
> */
> def columnarEval(batch: ColumnarBatch): Any = {
> throw new IllegalStateException(s"Internal Error ${this.getClass} has
> column support mismatch")
> }
> {code}
>
> SparkPlan will be updated to include
> {code:java}
> /**
> * Return true if this stage of the plan supports columnar execution.
> */
> def supportsColumnar: Boolean = false
> /**
> * The exact types of the columns that are output in columnar processing
> mode (used for faster codegen of transitions from columns to rows).
> */
> def vectorTypes: Option[Seq[String]] = None
> /**
> * Returns the result of this query as an RDD[ColumnarBatch] by delegating
> to `doColumnarExecute`
> * after preparations.
> *
> * Concrete implementations of SparkPlan should override `doColumnarExecute`
> if `supportsColumnar`
> * returns true.
> */
> final def executeColumnar(): RDD[ColumnarBatch] = executeQuery {
> if (isCanonicalizedPlan) {
> throw new IllegalStateException("A canonicalized plan is not supposed
> to be executed.")
> }
> doExecuteColumnar()
> }
> /**
> * Produces the result of the query as an `RDD[ColumnarBatch]` if
> [[supportsColumnar]] returns
> * true.
> */
> protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
> // If the user updates supportsColumnar, but not this blow up.
> throw new IllegalStateException(s"Internal Error ${this.getClass} has
> column support" +
> s" mismatch:\n${this}")
> }
> {code}
> In BufferedRowIterator init will change to reflect that the values in it
> could be ColumnarBatches too, which is the case today.
> {code:java}
> - public abstract void init(int index, Iterator<InternalRow>[] iters);
> + public abstract void init(int index, Iterator<Object>[] iters);
> {code}
>
> SparkSessionExtensions will have new APIs for columnar processing.
> {code:java}
> type ColumnarRuleBuilder = SparkSession => ColumnarRule
> def injectColumnar(builder: ColumnarRuleBuilder): Unit
> /**
> * :: Experimental ::
> * Holds a rule that run prior to inserting column to row and row to column
> transitions to
> * allow for injecting a columnar implementation into various operators, and
> a rule that
> * runs after to allow overriding the implementation of those transitions,
> and potentially
> * cleaning up the plan (like inserting batching of columnar data for more
> efficient processing).
> */
> @DeveloperApi
> @Experimental
> @Unstable
> class ColumnarRule {
> def pre(plan: SparkPlan): SparkPlan = plan
> def post(plan: SparkPlan): SparkPlan = plan
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]