[
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750740#comment-16750740
]
Hyukjin Kwon commented on SPARK-26413:
--------------------------------------
I think SPARK-26412 can be resolved together if this one is resolved.
> SPIP: RDD Arrow Support in Spark Core and PySpark
> -------------------------------------------------
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
> Issue Type: Improvement
> Components: PySpark, Spark Core
> Affects Versions: 3.0.0
> Reporter: Richard Whitcomb
> Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured
> Data. This is already true in Spark with the use of arrow in the pandas udf
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use
> cases.
> * Pandas UDF that allows for operations on one or more columns in the
> DataFrame API.
> * Collect as Pandas which pulls back the entire dataset to the driver in a
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe
> while staying distributed across the workers. The only way to do this
> currently is to drop down into RDDs and collect the rows into a dataframe.
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an
> Arrow Table fully while still making use of Spark's underlying technology.
> Some examples of possibilities with this new API.
> * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
> * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
> * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
> * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
> * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark
> Dataframe
> * Open the possibilities to tighter integration between Arrow/Pandas/Spark
> especially at a library level.
> h2. Non-Goals
> * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
> // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
> def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>
> // Converts RDD[ArrowTable] to an RDD of Rows
> def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
> import pyarrow as pa
> pdf = arrow_table.to_pandas()
> pdf = pdf[['email']]
> return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe
> {code}
> h3. Scala
>
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark
> users to interact with Arrow tools and libraries. This however does come
> with some considerations from a Spark perspective.
> Arrow is column based instead of Row based. In the above API proposal of
> RDD[ArrowTable] each RDD row will in fact be a block of data. Another
> proposal in this regard is to introduce a new parameter to Spark called
> arrow.sql.execution.arrow.maxRecordsPerTable. The goal of this parameter is
> to decide how many records are included in a single Arrow Table. If set to
> -1 the entire partition will be included in the table else to that number.
> Within that number the normal blocking mechanisms of Arrow is used to include
> multiple batches. This is still dictated by arrowMaxRecordsPerBatch.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]