[
https://issues.apache.org/jira/browse/SPARK-43108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
surender godara updated SPARK-43108:
------------------------------------
Description:
When you try to access the *storage status
(org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then
getStorageStatus method throw the NotSerializableException. This exception is
thrown because the StorageStatus object is not serializable.
Here is an example code snippet that demonstrates how to access the storage
status inside a MapPartitionsFunction in Spark:
{code:java}
StorageStatus[] storageStatus =
SparkEnv.get().blockManager().master().getStorageStatus();{code}
*Error stacktrace --*
{code:java}
Caused by: java.io.NotSerializableException:
org.apache.spark.storage.StorageStatus
Serialization stack:
- object not serializable (class: org.apache.spark.storage.StorageStatus,
value: org.apache.spark.storage.StorageStatus@715b4e82)
- element of array (index: 0)
- array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
at
org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
at
org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
at
org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}
*Steps to reproduce*
step 1 Initialize spark session with spark standalone mode.
step 2 Create a Dataset using the SparkSession and load data
step 3 Define the MapPartitionsFunction on Dataset and get storage status
inside it.
Here is the code snippet of MapPartitionsFunction
{code:java}
df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
@Override
public Iterator<Row> call(Iterator<Row> input) throws Exception {
StorageStatus[] storageStatus =
SparkEnv.get().blockManager().master().getStorageStatus();
return input;
}
}, RowEncoder.apply(df.schema()));
{code}
Step4 - submit the spark job.
*Solution -*
Implement the Serializable interface for org.apache.spark.storage.StorageStatus.
was:
When you try to access the *storage status
(org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then
getStorageStatus method throw the NotSerializableException. This exception is
thrown because the StorageStatus object is not serializable.
Here is an example code snippet that demonstrates how to access the storage
status inside a MapPartitionsFunction in Spark:
{code:java}
StorageStatus[] storageStatus =
SparkEnv.get().blockManager().master().getStorageStatus();{code}
*Error stacktrace --*
{code:java}
Caused by: java.io.NotSerializableException:
org.apache.spark.storage.StorageStatus
Serialization stack:
- object not serializable (class: org.apache.spark.storage.StorageStatus,
value: org.apache.spark.storage.StorageStatus@715b4e82)
- element of array (index: 0)
- array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
at
org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
at
org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
at
org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}
*Steps to reproduce*
step 1 create spark session with spark standalone mode.
step 2 Create a Dataset using the SparkSession object and load data from a
file or from a database.
step 3 Define the MapPartitionsFunction and get storage status inside.
Here is the code snippet of MapPartitionsFunction
{code:java}
df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
@Override
public Iterator<Row> call(Iterator<Row> input) throws Exception {
StorageStatus[] storageStatus =
SparkEnv.get().blockManager().master().getStorageStatus();
return input;
}
}, RowEncoder.apply(df.schema()));
{code}
Step4 - submit the spark job.
*Solution -*
Implement the Serializable interface for org.apache.spark.storage.StorageStatus.
> org.apache.spark.storage.StorageStatus NotSerializableException when try to
> access StorageStatus in a MapPartitionsFunction
> ---------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-43108
> URL: https://issues.apache.org/jira/browse/SPARK-43108
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.2.1
> Reporter: surender godara
> Priority: Minor
>
> When you try to access the *storage status
> (org.apache.spark.storage.StorageStatus)* inside a MapPartitionsFunction,then
> getStorageStatus method throw the NotSerializableException. This exception is
> thrown because the StorageStatus object is not serializable.
> Here is an example code snippet that demonstrates how to access the storage
> status inside a MapPartitionsFunction in Spark:
> {code:java}
> StorageStatus[] storageStatus =
> SparkEnv.get().blockManager().master().getStorageStatus();{code}
> *Error stacktrace --*
> {code:java}
> Caused by: java.io.NotSerializableException:
> org.apache.spark.storage.StorageStatus
> Serialization stack:
> - object not serializable (class: org.apache.spark.storage.StorageStatus,
> value: org.apache.spark.storage.StorageStatus@715b4e82)
> - element of array (index: 0)
> - array (class [Lorg.apache.spark.storage.StorageStatus;, size 2)
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
> at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:286)
> at
> org.apache.spark.rpc.netty.RemoteNettyRpcCallContext.send(NettyRpcCallContext.scala:64)
> at
> org.apache.spark.rpc.netty.NettyRpcCallContext.reply(NettyRpcCallContext.scala:32)
> at
> org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:156)
> at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
> at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
> at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> at
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
> at
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264){code}
> *Steps to reproduce*
> step 1 Initialize spark session with spark standalone mode.
> step 2 Create a Dataset using the SparkSession and load data
> step 3 Define the MapPartitionsFunction on Dataset and get storage status
> inside it.
> Here is the code snippet of MapPartitionsFunction
>
> {code:java}
> df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
> @Override
> public Iterator<Row> call(Iterator<Row> input) throws Exception {
> StorageStatus[] storageStatus =
> SparkEnv.get().blockManager().master().getStorageStatus();
> return input;
> }
> }, RowEncoder.apply(df.schema()));
> {code}
>
> Step4 - submit the spark job.
>
> *Solution -*
> Implement the Serializable interface for
> org.apache.spark.storage.StorageStatus.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]