[
https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mykhaylo Telizhyn updated SPARK-4830:
-------------------------------------
Description:
h4. Application Overview:
We have Spark Streaming application that consumes messages from RabbitMQ
and processes them. When generating hundreds of events on RabbitMQ and running
our application on spark standalone cluster we see some
{{java.lang.ClassNotFoundException}} exceptions in the log.
Our domain model is simple POJO that represents RabbitMQ events we want to
consume and contains some custom properties we are interested in:
{code:title=com.xxx.Event.java|borderStyle=solid}
public class Event implements java.io.Externalizable {
// custom properties
// custom implementation of writeExternal(), readExternal() methods
}
{code}
We have implemented custom Spark Streaming receiver that just receives
messages from RabbitMQ queue by means of custom consumer (See _"Receiving
messages by subscription"_ at https://www.rabbitmq.com/api-guide.html),
converts them to our custom domain event objects ({{com.xxx.Event}}) and stores
them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
byte[] body = // data received from Rabbit using custom consumer
Event event = new Event(body);
store(event) // store into Spark
{code}
The main program is simple, it just set up spark streaming context:
{code:title=Application.java|borderStyle=solid}
SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(BATCH_DURATION_MS));
{code}
Initialize input streams:
{code:title=Application.java|borderStyle=solid}
ReceiverInputDStream<Event> stream = // create input stream from
RabbitMQ
JavaReceiverInputDStream<Event> events = new
JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}
Process events:
{code:title=Application.java|borderStyle=solid}
events.foreachRDD(
rdd -> {
rdd.foreachPartition(
partition -> {
// process partition
}
}
})
ssc.start();
ssc.awaitTermination();
{code}
h4. Application submission:
Application is packaged as a single fat jar file using maven shade
plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled
with spark version _1.1.0_
We run our application on spark version _1.1.0_ standalone cluster that
consists of driver host, master host and two worker hosts. We submit
application from driver host.
On one of the workers we see {{java.lang.ClassNotFoundException}}
exceptions:
{panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
at
org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
at
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
at
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{panel}
We see that worker has downloaded application.jar and added it to class
loader:
{panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
14/11/27 10:26:59 INFO Executor: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar to
/tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding
file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
loader
{panel}
Also we manually checked jar file and it contains {{com.xxx.Event}} class
was:
We have Spark Streaming application that consumes messages from RabbitMQ and
process them. When generating hundreds of events on RabbitMQ and running our
application on spark standalone cluster we see some
java.lang.ClassNotFoundException exceptions in the log.
Application Overview:
Our domain model is simple POJO that represents RabbitMQ events we want to
consume and contains some custom properties we are interested in:
{code:title=Event.java|borderStyle=solid}
class Event implements java.io.Externalizable {
// custom properties
// custom implementation of writeExternal(), readExternal() methods
}
{code}
We have implemented custom spark receiver that just receives messages
from RabbitMQ queue by means of custom consumer(See "Receiving messages by
subscription" at https://www.rabbitmq.com/api-guide.html), converts them to our
custom domain event objects(com.xxx.Event) and stores them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
byte[] body = // data received from Rabbit using custom consumer
Event event = new Event(body);
store(event) // store into Spark
{code}
The main program is simple, it just set up spark streaming context:
{code:title=SparkApplication.java|borderStyle=solid}
SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
{code}
Initialize input streams:
{code:title=SparkApplication.java|borderStyle=solid}
ReceiverInputDStream<Event> stream = // create input stream from
RabbitMQ
JavaReceiverInputDStream<Event> events = new
JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}
Process events:
{code:title=SparkApplication.java|borderStyle=solid}
events.foreachRDD(
rdd -> {
rdd.foreachPartition(
partition -> {
// process partition
}
}
})
ssc.start();
ssc.awaitTermination();
{code}
Application submission:
Application is packaged into single fat jar using maven shade
plugin(http://maven.apache.org/plugins/maven-shade-plugin/). It compiled with
spark version 1.1.0
We run our application on spark version 1.1.0 standalone cluster that
consists of driver host, master host and two worker hosts. We submit
application from driver host.
On one of the workers we see java.lang.ClassNotFoundException
exceptions:
We see that worker has downloaded application.jar and added it to class
loader:
14/11/27 10:26:59 INFO Executor: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar to
/tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding
file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
loader
...........
14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
at
org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
at
org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
at
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
at
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
We manually checked jar file and it contains com.xxx.Event class
> Spark Java Application : java.lang.ClassNotFoundException
> ---------------------------------------------------------
>
> Key: SPARK-4830
> URL: https://issues.apache.org/jira/browse/SPARK-4830
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.1.0
> Reporter: Mykhaylo Telizhyn
>
> h4. Application Overview:
>
> We have Spark Streaming application that consumes messages from
> RabbitMQ and processes them. When generating hundreds of events on RabbitMQ
> and running our application on spark standalone cluster we see some
> {{java.lang.ClassNotFoundException}} exceptions in the log.
> Our domain model is simple POJO that represents RabbitMQ events we want to
> consume and contains some custom properties we are interested in:
> {code:title=com.xxx.Event.java|borderStyle=solid}
> public class Event implements java.io.Externalizable {
>
> // custom properties
> // custom implementation of writeExternal(), readExternal()
> methods
> }
> {code}
> We have implemented custom Spark Streaming receiver that just
> receives messages from RabbitMQ queue by means of custom consumer (See
> _"Receiving messages by subscription"_ at
> https://www.rabbitmq.com/api-guide.html), converts them to our custom domain
> event objects ({{com.xxx.Event}}) and stores them on spark memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
> byte[] body = // data received from Rabbit using custom consumer
> Event event = new Event(body);
> store(event) // store into Spark
> {code}
> The main program is simple, it just set up spark streaming context:
> {code:title=Application.java|borderStyle=solid}
> SparkConf sparkConf = new
> SparkConf().setAppName(APPLICATION_NAME);
>
> sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> new Duration(BATCH_DURATION_MS));
> {code}
> Initialize input streams:
> {code:title=Application.java|borderStyle=solid}
> ReceiverInputDStream<Event> stream = // create input stream from
> RabbitMQ
> JavaReceiverInputDStream<Event> events = new
> JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}
> Process events:
> {code:title=Application.java|borderStyle=solid}
> events.foreachRDD(
> rdd -> {
> rdd.foreachPartition(
> partition -> {
>
> // process partition
> }
> }
> })
>
> ssc.start();
> ssc.awaitTermination();
> {code}
> h4. Application submission:
>
> Application is packaged as a single fat jar file using maven shade
> plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled
> with spark version _1.1.0_
> We run our application on spark version _1.1.0_ standalone cluster
> that consists of driver host, master host and two worker hosts. We submit
> application from driver host.
>
> On one of the workers we see {{java.lang.ClassNotFoundException}}
> exceptions:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:344)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
> at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
> at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
> at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
> at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
> at
> org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
> at
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
> at
> org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {panel}
> We see that worker has downloaded application.jar and added it to class
> loader:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
> 14/11/27 10:26:59 INFO Executor: Fetching
> http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching
> http://xx.xx.xx.xx:38287/jars/application.jar to
> /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding
> file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
> loader
> {panel}
> Also we manually checked jar file and it contains {{com.xxx.Event}} class
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]