[ https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817081#comment-16817081 ]
Robert Joseph Evans commented on SPARK-26413: --------------------------------------------- SPARK-27396 covers this, but with a slightly different approach. > SPIP: RDD Arrow Support in Spark Core and PySpark > ------------------------------------------------- > > Key: SPARK-26413 > URL: https://issues.apache.org/jira/browse/SPARK-26413 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core > Affects Versions: 3.0.0 > Reporter: Richard Whitcomb > Priority: Minor > > h2. Background and Motivation > Arrow is becoming an standard interchange format for columnar Structured > Data. This is already true in Spark with the use of arrow in the pandas udf > functions in the dataframe API. > However the current implementation of arrow in spark is limited to two use > cases. > * Pandas UDF that allows for operations on one or more columns in the > DataFrame API. > * Collect as Pandas which pulls back the entire dataset to the driver in a > Pandas Dataframe. > What is still hard however is making use of all of the columns in a Dataframe > while staying distributed across the workers. The only way to do this > currently is to drop down into RDDs and collect the rows into a dataframe. > However pickling is very slow and the collecting is expensive. > The proposal is to extend spark in a way that allows users to operate on an > Arrow Table fully while still making use of Spark's underlying technology. > Some examples of possibilities with this new API. > * Pass the Arrow Table with Zero Copy to PyTorch for predictions. > * Pass to Nvidia Rapids for an algorithm to be run on the GPU. > * Distribute data across many GPUs making use of the new Barriers API. > h2. Targets users and personas > ML, Data Scientists, and future library authors.. > h2. Goals > * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table] > * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark > Dataframe > * Open the possibilities to tighter integration between Arrow/Pandas/Spark > especially at a library level. > h2. Non-Goals > * Not creating a new API but instead using existing APIs. > h2. Proposed API changes > h3. Data Objects > case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch]) > h3. Dataset.scala > {code:java} > // Converts a Dataset to an RDD of Arrow Tables > // Each RDD row is an Interable of Arrow Batches. > def arrowRDD: RDD[ArrowTable] > > // Utility Function to convert to RDD Arrow Table for PySpark > private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]] > {code} > h3. RDD.scala > {code:java} > // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema > def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe > > // Converts RDD[ArrowTable] to an RDD of Rows > def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code} > h3. Serializers.py > {code:java} > # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table. > class ArrowSerializer(FramedSerializer) > {code} > h3. RDD.py > {code} > # New RDD Class that has an RDD[ArrowTable] behind it and uses the new > ArrowSerializer instead of the normal Pickle Serializer > class ArrowRDD(RDD){code} > > h3. Dataframe.py > {code} > // New Function that converts a pyspark dataframe into an ArrowRDD > def arrow(self): > {code} > > h2. Example API Usage > h3. Pyspark > {code} > # Select a Single Column Using Pandas > def map_table(arrow_table): > import pyarrow as pa > pdf = arrow_table.to_pandas() > pdf = pdf[['email']] > return pa.Table.from_pandas(pdf) > # Convert to Arrow RDD, map over tables, convert back to dataframe > df.arrow.map(map_table).dataframe > {code} > h3. Scala > > {code:java} > // Find N Centroids using Cuda Rapids kMeans > def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable > > // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row] > df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10) > {code} > > h2. Implementation Details > As mentioned in the first section, the goal is to make it easier for Spark > users to interact with Arrow tools and libraries. This however does come > with some considerations from a Spark perspective. > Arrow is column based instead of Row based. In the above API proposal of > RDD[ArrowTable] each RDD row will in fact be a block of data. Another > proposal in this regard is to introduce a new parameter to Spark called > arrow.sql.execution.arrow.maxRecordsPerTable. The goal of this parameter is > to decide how many records are included in a single Arrow Table. If set to > -1 the entire partition will be included in the table else to that number. > Within that number the normal blocking mechanisms of Arrow is used to include > multiple batches. This is still dictated by arrowMaxRecordsPerBatch. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org