Kazuaki Ishizaki,

Yes, ColumnarBatchScan does provide a framework for doing code generation
for the processing of columnar data.  I have to admit that I don't have a
deep understanding of the code generation piece, so if I get something
wrong please correct me.  From what I had seen only input formats currently
inherent from ColumnarBatchScan, and from comments in the trait

  /**
   * Generate [[ColumnVector]] expressions for our parent to consume as
rows.
   * This is called once per [[ColumnarBatch]].
   */
https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43

It appears that ColumnarBatchScan is really only intended to pull out the
data from the batch, and not to process that data in a columnar fashion.
The Loading stage that you mentioned.

> The SIMDzation or GPUization capability depends on a compiler that
translates native code from the code generated by the whole-stage codegen.
To be able to support vectorized processing Hive stayed with pure java and
let the JVM detect and do the SIMDzation of the code.  To make that happen
they created loops to go through each element in a column and remove all
conditionals from the body of the loops.  To the best of my knowledge that
would still require a separate code path like I am proposing to make the
different processing phases generate code that the JVM can compile down to
SIMD instructions.  The generated code is full of null checks for each
element which would prevent the operations we want.  Also, the intermediate
results are often stored in UnsafeRow instances.  This is really fast for
row-based processing, but the complexity of how they work I believe would
prevent the JVM from being able to vectorize the processing.  If you have a
better way to take java code and vectorize it we should put it into OpenJDK
instead of spark so everyone can benefit from it.

Trying to compile directly from generated java code to something a GPU can
process is something we are tackling but we decided to go a different route
from what you proposed.  From talking with several compiler experts here at
NVIDIA my understanding is that IBM in partnership with NVIDIA attempted in
the past to extend the JVM to run at least partially on GPUs, but it was
really difficult to get right, especially with how java does memory
management and memory layout.

To avoid that complexity we decided to split the JITing up into two
separate pieces.  I didn't mention any of this before because this
discussion was intended to just be around the memory layout support, and
not GPU processing.  The first part would be to take the Catalyst AST and
produce CUDA code directly from it.  If properly done we should be able to
do the selection and projection phases within a single kernel.  The biggest
issue comes with UDFs as they cannot easily be vectorized for the CPU or
GPU.  So to deal with that we have a prototype written by the compiler team
that is trying to tackle SPARK-14083 which can translate basic UDFs into
catalyst expressions.  If the UDF is too complicated or covers operations
not yet supported it will fall back to the original UDF processing.  I
don't know how close the team is to submit a SPIP or a patch for it, but I
do know that they have some very basic operations working.  The big issue
is that it requires java 11+ so it can use standard APIs to get the byte
code of scala UDFs.

We split it this way because we thought it would be simplest to implement,
and because it would provide a benefit to more than just GPU accelerated
queries.

Thanks,

Bobby

On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki <ishiz...@jp.ibm.com>
wrote:

> Looks interesting discussion.
> Let me describe the current structure and remaining issues. This is
> orthogonal to cost-benefit trade-off discussion.
>
> The code generation basically consists of three parts.
> 1. Loading
> 2. Selection (map, filter, ...)
> 3. Projection
>
> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well
> abstracted by using ColumnVector (
> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java)
> class. By combining with ColumnarBatchScan, the whole-stage code generation
> generate code to directly get valus from the columnar storage if there is
> no row-based operation.
> Note: The current master does not support Arrow as a data source. However,
> I think it is not technically hard to support Arrow.
>
> 2. The current whole-stage codegen generates code for element-wise
> selection (excluding sort and join). The SIMDzation or GPUization
> capability depends on a compiler that translates native code from the code
> generated by the whole-stage codegen.
>
> 3. The current Projection assume to store row-oriented data, I think that
> is a part that Wenchen pointed out
>
> My slides
> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
> <https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use>may
> simplify the above issue and possible implementation.
>
>
>
> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru Python
> at SAIS 2019
> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110.
> I think that it uses Python UDF support with Arrow in Spark.
>
> P.S. I will give a presentation about in-memory data storages for SPark at
> SAIS 2019
> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
> :)
>
> Kazuaki Ishizaki
>
>
>
> From:        Wenchen Fan <cloud0...@gmail.com>
> To:        Bobby Evans <bo...@apache.org>
> Cc:        Spark dev list <dev@spark.apache.org>
> Date:        2019/03/26 13:53
> Subject:        Re: [DISCUSS] Spark Columnar Processing
> ------------------------------
>
>
>
> Do you have some initial perf numbers? It seems fine to me to remain
> row-based inside Spark with whole-stage-codegen, and convert rows to
> columnar batches when communicating with external systems.
>
> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <*bo...@apache.org*
> <bo...@apache.org>> wrote:
> This thread is to discuss adding in support for data frame processing
> using an in-memory columnar format compatible with Apache Arrow.  My main
> goal in this is to lay the groundwork so we can add in support for GPU
> accelerated processing of data frames, but this feature has a number of
> other benefits.  Spark currently supports Apache Arrow formatted data as an
> option to exchange data with python for pandas UDF processing. There has
> also been discussion around extending this to allow for exchanging data
> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
> processing on Arrow compatible data it could eliminate the
> serialization/deserialization overhead when going between these systems.
> It also would allow for doing optimizations on a CPU with SIMD instructions
> similar to what Hive currently supports. Accelerated processing using a GPU
> is something that we will start a separate discussion thread on, but I
> wanted to set the context a bit.
> Jason Lowe, Tom Graves, and I created a prototype over the past few months
> to try and understand how to make this work.  What we are proposing is
> based off of lessons learned when building this prototype, but we really
> wanted to get feedback early on from the community. We will file a SPIP
> once we can get agreement that this is a good direction to go in.
>
> The current support for columnar processing lets a Parquet or Orc file
> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
> erasure. The code generation is aware that the RDD actually holds
> ColumnarBatchs and generates code to loop through the data in each batch as
> InternalRows.
>
>
> Instead, we propose a new set of APIs to work on an
> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
> propose adding in a Rule similar to how WholeStageCodeGen currently works.
> Each part of the physical SparkPlan would expose columnar support through a
> combination of traits and method calls. The rule would then decide when
> columnar processing would start and when it would end. Switching between
> columnar and row based processing is not free, so the rule would make a
> decision based off of an estimate of the cost to do the transformation and
> the estimated speedup in processing time.
>
>
> This should allow us to disable columnar support by simply disabling the
> rule that modifies the physical SparkPlan.  It should be minimal risk to
> the existing row-based code path, as that code should not be touched, and
> in many cases could be reused to implement the columnar version.  This also
> allows for small easily manageable patches. No huge patches that no one
> wants to review.
>
>
> As far as the memory layout is concerned OnHeapColumnVector and
> OffHeapColumnVector are already really close to being Apache Arrow
> compatible so shifting them over would be a relatively simple change.
> Alternatively we could add in a new implementation that is Arrow compatible
> if there are reasons to keep the old ones.
>
>
> Again this is just to get the discussion started, any feedback is welcome,
> and we will file a SPIP on it once we feel like the major changes we are
> proposing are acceptable.
>
> Thanks,
>
> Bobby Evans
>
>

Reply via email to