[
https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mykhaylo Telizhyn updated SPARK-4830:
-------------------------------------
Description:
h4. Application Overview:
We have Spark Streaming application that consumes messages from RabbitMQ
and processes them. When generating hundreds of events on RabbitMQ and running
our application on spark standalone cluster we see some
{{java.lang.ClassNotFoundException}} exceptions in the log.
Our domain model is simple POJO that represents RabbitMQ events we want to
consume and contains some custom properties we are interested in:
{code:title=com.xxx.Event.java|borderStyle=solid}
public class Event implements java.io.Externalizable {
// custom properties
// custom implementation of writeExternal(), readExternal() methods
}
{code}
We have implemented custom Spark Streaming receiver that just receives
messages from RabbitMQ queue by means of custom consumer (See _"Receiving
messages by subscription"_ at https://www.rabbitmq.com/api-guide.html),
converts them to our custom domain event objects ({{com.xxx.Event}}) and stores
them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
byte[] body = // data received from Rabbit using custom consumer
Event event = new Event(body);
store(event) // store into Spark
{code}
The main program is simple, it just set up spark streaming context:
{code:title=Application.java|borderStyle=solid}
SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(BATCH_DURATION_MS));
{code}
Initialize input streams:
{code:title=Application.java|borderStyle=solid}
ReceiverInputDStream<Event> stream = // create input stream from
RabbitMQ
JavaReceiverInputDStream<Event> events = new
JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}
Process events:
{code:title=Application.java|borderStyle=solid}
events.foreachRDD(
rdd -> {
rdd.foreachPartition(
partition -> {
// process partition
}
}
})
ssc.start();
ssc.awaitTermination();
{code}
h4. Application submission:
Application is packaged as a single fat jar file using maven shade
plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled
with spark version _1.1.0_
We run our application on spark version _1.1.0_ standalone cluster that
consists of driver host, master host and two worker hosts. We submit
application from driver host.
On one of the workers we see {{java.lang.ClassNotFoundException}}
exceptions:
{panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
at
org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
at
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
at
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{panel}
We see that worker has downloaded application.jar and added it to class
loader:
{panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
14/11/27 10:26:59 INFO Executor: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar to
/tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding
file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
loader
{panel}
Also we manually checked jar file and it contains {{com.xxx.Event}} class
was:
h4. Application Overview:
We have Spark Streaming application that consumes messages from RabbitMQ
and processes them. When generating hundreds of events on RabbitMQ and running
our application on spark standalone cluster we see some
{{java.lang.ClassNotFoundException}} exceptions in the log.
Our domain model is simple POJO that represents RabbitMQ events we want to
consume and contains some custom properties we are interested in:
{code:title=com.xxx.Event.java|borderStyle=solid}
public class Event implements java.io.Externalizable {
// custom properties
// custom implementation of writeExternal(), readExternal() methods
}
{code}
We have implemented custom Spark Streaming receiver that just receives
messages from RabbitMQ queue by means of custom consumer (See _"Receiving
messages by subscription"_ at https://www.rabbitmq.com/api-guide.html),
converts them to our custom domain event objects ({{com.xxx.Event}}) and stores
them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
byte[] body = // data received from Rabbit using custom consumer
Event event = new Event(body);
store(event) // store into Spark
{code}
The main program is simple, it just set up spark streaming context:
{code:title=Application.java|borderStyle=solid}
SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(BATCH_DURATION_MS));
{code}
Initialize input streams:
{code:title=Application.java|borderStyle=solid}
ReceiverInputDStream<Event> stream = // create input stream from
RabbitMQ
JavaReceiverInputDStream<Event> events = new
JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}
Process events:
{code:title=Application.java|borderStyle=solid}
events.foreachRDD(
rdd -> {
rdd.foreachPartition(
partition -> {
// process partition
}
}
})
ssc.start();
ssc.awaitTermination();
{code}
h4. Application submission:
Application is packaged as a single fat jar file using maven shade
plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled
with spark version _1.1.0_
We run our application on spark version _1.1.0_ standalone cluster that
consists of driver host, master host and two worker hosts. We submit
application from driver host.
On one of the workers we see {{java.lang.ClassNotFoundException}}
exceptions:
{panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
at
org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
at
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
at
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
at
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{panel}
We see that worker has downloaded application.jar and added it to class
loader:
{panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
14/11/27 10:26:59 INFO Executor: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching
http://xx.xx.xx.xx:38287/jars/application.jar to
/tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding
file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
loader
{panel}
Also we manually checked jar file and it contains {{com.xxx.Event}} class
> Spark Java Application : java.lang.ClassNotFoundException
> ---------------------------------------------------------
>
> Key: SPARK-4830
> URL: https://issues.apache.org/jira/browse/SPARK-4830
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.1.0
> Reporter: Mykhaylo Telizhyn
>
> h4. Application Overview:
>
> We have Spark Streaming application that consumes messages from
> RabbitMQ and processes them. When generating hundreds of events on RabbitMQ
> and running our application on spark standalone cluster we see some
> {{java.lang.ClassNotFoundException}} exceptions in the log.
> Our domain model is simple POJO that represents RabbitMQ events we want to
> consume and contains some custom properties we are interested in:
> {code:title=com.xxx.Event.java|borderStyle=solid}
> public class Event implements java.io.Externalizable {
>
> // custom properties
> // custom implementation of writeExternal(), readExternal()
> methods
> }
> {code}
> We have implemented custom Spark Streaming receiver that just
> receives messages from RabbitMQ queue by means of custom consumer (See
> _"Receiving messages by subscription"_ at
> https://www.rabbitmq.com/api-guide.html), converts them to our custom domain
> event objects ({{com.xxx.Event}}) and stores them on spark memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
> byte[] body = // data received from Rabbit using custom consumer
> Event event = new Event(body);
> store(event) // store into Spark
> {code}
> The main program is simple, it just set up spark streaming context:
> {code:title=Application.java|borderStyle=solid}
> SparkConf sparkConf = new
> SparkConf().setAppName(APPLICATION_NAME);
>
> sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> new Duration(BATCH_DURATION_MS));
> {code}
> Initialize input streams:
> {code:title=Application.java|borderStyle=solid}
> ReceiverInputDStream<Event> stream = // create input stream from
> RabbitMQ
> JavaReceiverInputDStream<Event> events = new
> JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}
> Process events:
> {code:title=Application.java|borderStyle=solid}
> events.foreachRDD(
> rdd -> {
> rdd.foreachPartition(
> partition -> {
>
> // process partition
> }
> }
> })
>
> ssc.start();
> ssc.awaitTermination();
> {code}
> h4. Application submission:
>
> Application is packaged as a single fat jar file using maven shade
> plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled
> with spark version _1.1.0_
> We run our application on spark version _1.1.0_ standalone cluster
> that consists of driver host, master host and two worker hosts. We submit
> application from driver host.
>
> On one of the workers we see {{java.lang.ClassNotFoundException}}
> exceptions:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:344)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
> at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
> at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
> at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
> at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
> at
> org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
> at
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
> at
> org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {panel}
> We see that worker has downloaded application.jar and added it to class
> loader:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
> 14/11/27 10:26:59 INFO Executor: Fetching
> http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching
> http://xx.xx.xx.xx:38287/jars/application.jar to
> /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding
> file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
> loader
> {panel}
> Also we manually checked jar file and it contains {{com.xxx.Event}} class
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]