surender godara created SPARK-43108:
---------------------------------------

             Summary: org.apache.spark.storage.StorageStatus 
NotSerializableException when try to access StorageStatus in a 
MapPartitionsFunction
                 Key: SPARK-43108
                 URL: https://issues.apache.org/jira/browse/SPARK-43108
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.2.1
            Reporter: surender godara


When you try to access the *storage status 
(org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then 
getStorageStatus method throw the NotSerializableException. This exception is 
thrown because the StorageStatus object is not serializable.

Here is an example code snippet that demonstrates how to access the storage 
status inside a MapPartitionsFunction in Spark:
{code:java}
StorageStatus[] storageStatus = 
SparkEnv.get().blockManager().master().getStorageStatus();{code}

*Error stacktrace --*
{code:java}
Caused by: java.io.NotSerializableException: 
org.apache.spark.storage.StorageStatus
Serialization stack:
    - object not serializable (class: org.apache.spark.storage.StorageStatus, 
value: org.apache.spark.storage.StorageStatus@715b4e82)
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
    at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
    at 
org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
    at 
org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
    at 
org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}

*Steps to reproduce*


step 1  create spark session with spark standalone mode.

step 2  Create a Dataset using the SparkSession object and load data from a 
file or from a database.

step 3  Define the MapPartitionsFunction and get storage status inside.

Here is the code snippet of MapPartitionsFunction


 
{code:java}
df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
            @Override
            public Iterator<Row> call(Iterator<Row> input) throws Exception {
                StorageStatus[] storageStatus = 
SparkEnv.get().blockManager().master().getStorageStatus();
                return input;
            }
        }, RowEncoder.apply(df.schema()));
{code}
 


Step4 - submit the spark job. 

 

*Solution -*

Implement the Serializable interface for org.apache.spark.storage.StorageStatus.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to