[ 
https://issues.apache.org/jira/browse/SPARK-43108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

surender godara updated SPARK-43108:
------------------------------------
    Description: 
When you try to access the *storage status 
(org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then 
getStorageStatus method throw the NotSerializableException. This exception is 
thrown because the StorageStatus object is not serializable.

Here is an example code snippet that demonstrates how to access the storage 
status inside a MapPartitionsFunction in Spark:
{code:java}
StorageStatus[] storageStatus = 
SparkEnv.get().blockManager().master().getStorageStatus();{code}
*Error stacktrace --*
{code:java}
Caused by: java.io.NotSerializableException: 
org.apache.spark.storage.StorageStatus
Serialization stack:
    - object not serializable (class: org.apache.spark.storage.StorageStatus, 
value: org.apache.spark.storage.StorageStatus@715b4e82)
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
    at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
    at 
org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
    at 
org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
    at 
org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}
*Steps to reproduce*

step 1  Initialize spark session with spark standalone mode.

step 2  Create a Dataset using the SparkSession and load data

step 3  Define the MapPartitionsFunction on Dataset and get storage status 
inside it.

Here is the code snippet of MapPartitionsFunction

 
{code:java}
df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
            @Override
            public Iterator<Row> call(Iterator<Row> input) throws Exception {
                StorageStatus[] storageStatus = 
SparkEnv.get().blockManager().master().getStorageStatus();
                return input;
            }
        }, RowEncoder.apply(df.schema()));
{code}
 

Step4 - submit the spark job. 

 

*Solution -*

Implement the Serializable interface for org.apache.spark.storage.StorageStatus.

 

 

  was:
When you try to access the *storage status 
(org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then 
getStorageStatus method throw the NotSerializableException. This exception is 
thrown because the StorageStatus object is not serializable.

Here is an example code snippet that demonstrates how to access the storage 
status inside a MapPartitionsFunction in Spark:
{code:java}
StorageStatus[] storageStatus = 
SparkEnv.get().blockManager().master().getStorageStatus();{code}

*Error stacktrace --*
{code:java}
Caused by: java.io.NotSerializableException: 
org.apache.spark.storage.StorageStatus
Serialization stack:
    - object not serializable (class: org.apache.spark.storage.StorageStatus, 
value: org.apache.spark.storage.StorageStatus@715b4e82)
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
    at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
    at 
org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
    at 
org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
    at 
org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}

*Steps to reproduce*


step 1  create spark session with spark standalone mode.

step 2  Create a Dataset using the SparkSession object and load data from a 
file or from a database.

step 3  Define the MapPartitionsFunction and get storage status inside.

Here is the code snippet of MapPartitionsFunction


 
{code:java}
df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
            @Override
            public Iterator<Row> call(Iterator<Row> input) throws Exception {
                StorageStatus[] storageStatus = 
SparkEnv.get().blockManager().master().getStorageStatus();
                return input;
            }
        }, RowEncoder.apply(df.schema()));
{code}
 


Step4 - submit the spark job. 

 

*Solution -*

Implement the Serializable interface for org.apache.spark.storage.StorageStatus.

 

 


> org.apache.spark.storage.StorageStatus NotSerializableException when try to 
> access StorageStatus in a MapPartitionsFunction
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-43108
>                 URL: https://issues.apache.org/jira/browse/SPARK-43108
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.1
>            Reporter: surender godara
>            Priority: Minor
>
> When you try to access the *storage status 
> (org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then 
> getStorageStatus method throw the NotSerializableException. This exception is 
> thrown because the StorageStatus object is not serializable.
> Here is an example code snippet that demonstrates how to access the storage 
> status inside a MapPartitionsFunction in Spark:
> {code:java}
> StorageStatus[] storageStatus = 
> SparkEnv.get().blockManager().master().getStorageStatus();{code}
> *Error stacktrace --*
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.storage.StorageStatus
> Serialization stack:
>     - object not serializable (class: org.apache.spark.storage.StorageStatus, 
> value: org.apache.spark.storage.StorageStatus@715b4e82)
>     - element of array (index: 0)
>     - array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
>     at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
>     at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>     at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
>     at 
> org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
>     at 
> org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
>     at 
> org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
>     at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
>     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
>     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>     at 
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>     at 
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}
> *Steps to reproduce*
> step 1  Initialize spark session with spark standalone mode.
> step 2  Create a Dataset using the SparkSession and load data
> step 3  Define the MapPartitionsFunction on Dataset and get storage status 
> inside it.
> Here is the code snippet of MapPartitionsFunction
>  
> {code:java}
> df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
>             @Override
>             public Iterator<Row> call(Iterator<Row> input) throws Exception {
>                 StorageStatus[] storageStatus = 
> SparkEnv.get().blockManager().master().getStorageStatus();
>                 return input;
>             }
>         }, RowEncoder.apply(df.schema()));
> {code}
>  
> Step4 - submit the spark job. 
>  
> *Solution -*
> Implement the Serializable interface for 
> org.apache.spark.storage.StorageStatus.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to