[
https://issues.apache.org/jira/browse/SPARK-32601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17194941#comment-17194941
]
Linar Savion commented on SPARK-32601:
--------------------------------------
Hi [~tahmad], this can be easily done using RecordBatch.serialize() to utilize
the stream format:
Or alternatively using `createDataFrame(pandasRDD=True)` after SPARK-32846 will
be hopefully merged.
Example for pyspark>v3.0.0
{code:java}
from pyspark.sql import SparkSession
import pyarrow as pa
def _arrow_record_batch_dumps(rb):
return bytearray(rb.serialize())
def rb_return(ardd):
data = [
pa.array(range(5), type='int16'),
pa.array([-10, -5, 0, None, 10], type='int32')
]
schema = pa.schema([pa.field('c0', pa.int16()),
pa.field('c1', pa.int32())],
metadata={b'foo': b'bar'})
return pa.RecordBatch.from_arrays(data, schema=schema)
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("Python Arrow-in-Spark example") \
.getOrCreate()
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
sc = spark.sparkContext
ardd = spark.sparkContext.parallelize([0, 1, 2], 3)
ardd = ardd.map(rb_return)
from pyspark.sql.pandas.types import from_arrow_schema
from pyspark.sql.dataframe import DataFrame
# Filter out and cache arrow record batches
ardd = ardd.filter(lambda x: isinstance(x, pa.RecordBatch)).cache()
ardd = ardd.map(_arrow_record_batch_dumps)
schema = pa.schema([pa.field('c0', pa.int16()),
pa.field('c1', pa.int32())],
metadata={b'foo': b'bar'})
schema = from_arrow_schema(schema)
jrdd = ardd._to_java_object_rdd()
jdf = spark._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(),
spark._wrapped._jsqlContext)
df = DataFrame(jdf, spark._wrapped)
df._schema = schema
df.show()
{code}
> Issue in converting an RDD of Arrow RecordBatches in v3.0.0
> -----------------------------------------------------------
>
> Key: SPARK-32601
> URL: https://issues.apache.org/jira/browse/SPARK-32601
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.0.0
> Reporter: Tanveer
> Priority: Major
>
> The following simple code snippet for converting an RDD of Arrow
> RecordBatches works perfectly in Spark v2.3.4.
>
> {code:java}
> // code placeholder
> from pyspark.sql import SparkSession
> import pyspark
> import pyarrow as pa
> from pyspark.serializers import ArrowSerializer
> def _arrow_record_batch_dumps(rb):
> # Fix for interoperability between pyarrow version >=0.15 and Spark's
> arrow version
> # Streaming message protocol has changed, remove setting when upgrading
> spark.
> import os
> os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1'
>
> return bytearray(ArrowSerializer().dumps(rb))
> def rb_return(ardd):
> data = [
> pa.array(range(5), type='int16'),
> pa.array([-10, -5, 0, None, 10], type='int32')
> ]
> schema = pa.schema([pa.field('c0', pa.int16()),
> pa.field('c1', pa.int32())],
> metadata={b'foo': b'bar'})
> return pa.RecordBatch.from_arrays(data, schema=schema)
> if __name__ == '__main__':
> spark = SparkSession \
> .builder \
> .appName("Python Arrow-in-Spark example") \
> .getOrCreate()
> # Enable Arrow-based columnar data transfers
> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
> sc = spark.sparkContext
> ardd = spark.sparkContext.parallelize([0,1,2], 3)
> ardd = ardd.map(rb_return)
> from pyspark.sql.types import from_arrow_schema
> from pyspark.sql.dataframe import DataFrame
> from pyspark.serializers import ArrowSerializer, PickleSerializer,
> AutoBatchedSerializer
> # Filter out and cache arrow record batches
> ardd = ardd.filter(lambda x: isinstance(x, pa.RecordBatch)).cache()
> ardd = ardd.map(_arrow_record_batch_dumps)
> schema = pa.schema([pa.field('c0', pa.int16()),
> pa.field('c1', pa.int32())],
> metadata={b'foo': b'bar'})
> schema = from_arrow_schema(schema)
> jrdd = ardd._to_java_object_rdd()
> jdf = spark._jvm.PythonSQLUtils.arrowPayloadToDataFrame(jrdd,
> schema.json(), spark._wrapped._jsqlContext)
> df = DataFrame(jdf, spark._wrapped)
> df._schema = schema
> df.show()
> {code}
>
> But after updating to Spark to v3.0.0, the same functionality with just
> changing arrowPayloadToDataFrame() -> toDataFrame() doesn't work.
>
> {code:java}
> // code placeholder
> from pyspark.sql import SparkSession
> import pyspark
> import pyarrow as pa
> #from pyspark.serializers import ArrowSerializerdef dumps(batch):
> import pyarrow as pa
> import io
> sink = io.BytesIO()
> writer = pa.RecordBatchFileWriter(sink, batch.schema)
> writer.write_batch(batch)
> writer.close()
> return sink.getvalue()def _arrow_record_batch_dumps(rb):
> # Fix for interoperability between pyarrow version >=0.15 and Spark's
> arrow version
> # Streaming message protocol has changed, remove setting when upgrading
> spark.
> #import os
> #os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1' #return
> bytearray(ArrowSerializer().dumps(rb))
> return bytearray(dumps(rb))
> def rb_return(ardd):
> data = [
> pa.array(range(5), type='int16'),
> pa.array([-10, -5, 0, None, 10], type='int32')
> ]
> schema = pa.schema([pa.field('c0', pa.int16()),
> pa.field('c1', pa.int32())],
> metadata={b'foo': b'bar'})
> return pa.RecordBatch.from_arrays(data, schema=schema)if __name__ ==
> '__main__':
> spark = SparkSession \
> .builder \
> .appName("Python Arrow-in-Spark example") \
> .getOrCreate() # Enable Arrow-based columnar data transfers
> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
> sc = spark.sparkContext ardd = spark.sparkContext.parallelize([0,1,2],
> 3)
> ardd = ardd.map(rb_return) from pyspark.sql.pandas.types import
> from_arrow_schema
> from pyspark.sql.dataframe import DataFrame # Filter out and cache
> arrow record batches
> ardd = ardd.filter(lambda x: isinstance(x, pa.RecordBatch)).cache()
> ardd = ardd.map(_arrow_record_batch_dumps) schema =
> pa.schema([pa.field('c0', pa.int16()),
> pa.field('c1', pa.int32())],
> metadata={b'foo': b'bar'})
> schema = from_arrow_schema(schema) jrdd = ardd._to_java_object_rdd()
> #jdf = spark._jvm.PythonSQLUtils.arrowPayloadToDataFrame(jrdd,
> schema.json(), spark._wrapped._jsqlContext)
> jdf = spark._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(),
> spark._wrapped._jsqlContext)
> df = DataFrame(jdf, spark._wrapped)
> df._schema = schema df.show(){code}
> First it gives error for Heap:
> {code:java}
> // code placeholder
> 20/08/12 12:18:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)20/08/12 12:18:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)java.lang.OutOfMemoryError: Java heap space at
> java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at
> java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at
> org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
> at
> org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:336)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$.loadBatch(ArrowConverters.scala:189)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.nextBatch(ArrowConverters.scala:165)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.<init>(ArrowConverters.scala:144)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$.fromBatchIterator(ArrowConverters.scala:143)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$.$anonfun$toDataFrame$1(ArrowConverters.scala:203)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$$$Lambda$1806/1325557847.apply(Unknown
> Source) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
> at org.apache.spark.rdd.RDD$$Lambda$1805/889467051.apply(Unknown Source) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
> org.apache.spark.scheduler.Task.run(Task.scala:127) at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
> at
> org.apache.spark.executor.Executor$TaskRunner$$Lambda$1773/1728811302.apply(Unknown
> Source) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)20/08/12 12:18:48 ERROR
> SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor
> task launch worker for task 0,5,main]java.lang.OutOfMemoryError: Java heap
> space
> {code}
> And when using parameter --driver-memory 4g for this very small data, it
> gives:
> {code:java}
> 20/08/12 12:22:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)20/08/12 12:22:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)java.io.IOException: Unexpected end of stream trying to read message. at
> org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:671)
> at
> org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch(MessageSerializer.java:336)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$.loadBatch(ArrowConverters.scala:189)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.nextBatch(ArrowConverters.scala:165)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.<init>(ArrowConverters.scala:144)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$.fromBatchIterator(ArrowConverters.scala:143)
> at
> org.apache.spark.sql.execution.arrow.ArrowConverters$.$anonfun$toDataFrame$1(ArrowConverters.scala:203)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
> org.apache.spark.scheduler.Task.run(Task.scala:127) at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)20/08/12 12:22:18 WARN
> TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, int2.bullx, executor
> driver): java.io.IOException: Unexpected end of stream trying to read message.
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]