[
https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mykhaylo Telizhyn updated SPARK-4830:
-------------------------------------
Description:
We have Spark Streaming application that consumes messages from RabbitMQ and
process them. When generating hundreds of events on RabbitMQ and running our
application on spark standalone cluster we see some
java.lang.ClassNotFoundException exceptions in the log.
Application Overview:
Our domain model is simple POJO that represents RabbitMQ events we want to
consume and contains some custom properties we are interested in:
{code:title=Event.java|borderStyle=solid}
class Event implements java.io.Externalizable {
// custom properties
// custom implementation of writeExternal(), readExternal() methods
}
{code}
We have implemented custom spark receiver that just receives messages
from RabbitMQ queue by means of custom consumer(See "Receiving messages by
subscription" at https://www.rabbitmq.com/api-guide.html), converts them to our
custom domain event objects(com.xxx.Event) and stores them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
byte[] body = // data received from Rabbit using custom consumer
Event event = new Event(body);
store(event) // store into Spark
{code}
The main program is simple, it just set up spark streaming context:
{code:title=SparkApplication.java|borderStyle=solid}
SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
{code}
Initialize input streams:
{code:title=SparkApplication.java|borderStyle=solid}
ReceiverInputDStream<Event> stream = // create input stream from
RabbitMQ
JavaReceiverInputDStream<Event> events = new
JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}
Process events:
{code:title=SparkApplication.java|borderStyle=solid}
events.foreachRDD(
rdd -> {
rdd.foreachPartition(
partition -> {
// process partition
}
}
})
ssc.start();
ssc.awaitTermination();
{code}
Application submission:
Application is packaged into single fat jar using maven shade
plugin(http://maven.apache.org/plugins/maven-shade-plugin/). It compiled with
spark version 1.1.0
We run our application on spark version 1.1.0 standalone cluster that
consists of driver host, master host and two worker hosts. We submit
application from driver host.
On one of the workers we see java.lang.ClassNotFoundException
exceptions:
We see that worker has downloaded application.jar and added it to class
loader:
14/11/27 10:26:59 INFO Executor: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar to
/tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding
file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
loader
...........
14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
at
org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
at
org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
at
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
at
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
We manually checked jar file and it contains com.xxx.Event class
> Spark Java Application : java.lang.ClassNotFoundException
> ---------------------------------------------------------
>
> Key: SPARK-4830
> URL: https://issues.apache.org/jira/browse/SPARK-4830
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.1.0
> Reporter: Mykhaylo Telizhyn
>
> We have Spark Streaming application that consumes messages from RabbitMQ and
> process them. When generating hundreds of events on RabbitMQ and running our
> application on spark standalone cluster we see some
> java.lang.ClassNotFoundException exceptions in the log.
> Application Overview:
> Our domain model is simple POJO that represents RabbitMQ events we want to
> consume and contains some custom properties we are interested in:
> {code:title=Event.java|borderStyle=solid}
> class Event implements java.io.Externalizable {
>
> // custom properties
> // custom implementation of writeExternal(), readExternal()
> methods
> }
> {code}
> We have implemented custom spark receiver that just receives messages
> from RabbitMQ queue by means of custom consumer(See "Receiving messages by
> subscription" at https://www.rabbitmq.com/api-guide.html), converts them to
> our custom domain event objects(com.xxx.Event) and stores them on spark
> memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
> byte[] body = // data received from Rabbit using custom consumer
> Event event = new Event(body);
> store(event) // store into Spark
> {code}
> The main program is simple, it just set up spark streaming context:
> {code:title=SparkApplication.java|borderStyle=solid}
> SparkConf sparkConf = new
> SparkConf().setAppName(APPLICATION_NAME);
>
> sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
> {code}
> Initialize input streams:
> {code:title=SparkApplication.java|borderStyle=solid}
> ReceiverInputDStream<Event> stream = // create input stream from
> RabbitMQ
> JavaReceiverInputDStream<Event> events = new
> JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}
> Process events:
> {code:title=SparkApplication.java|borderStyle=solid}
> events.foreachRDD(
> rdd -> {
> rdd.foreachPartition(
> partition -> {
>
> // process partition
> }
> }
> })
>
> ssc.start();
> ssc.awaitTermination();
> {code}
> Application submission:
>
> Application is packaged into single fat jar using maven shade
> plugin(http://maven.apache.org/plugins/maven-shade-plugin/). It compiled with
> spark version 1.1.0
> We run our application on spark version 1.1.0 standalone cluster that
> consists of driver host, master host and two worker hosts. We submit
> application from driver host.
> On one of the workers we see java.lang.ClassNotFoundException
> exceptions:
> We see that worker has downloaded application.jar and added it to
> class loader:
> 14/11/27 10:26:59 INFO Executor: Fetching
> http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching
> http://xx.xx.xx.xx:38287/jars/application.jar to
> /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding
> file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
> loader
> ...........
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:344)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
> at
> org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
> at
> org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
> at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
> at
> org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
> at
> org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
> at
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
> at
> org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> We manually checked jar file and it contains com.xxx.Event class
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]