[ 
https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mykhaylo Telizhyn updated SPARK-4830:
-------------------------------------
    Description: 
We have Spark Streaming application that consumes messages from RabbitMQ and 
process them. When generating hundreds of events on RabbitMQ and running our 
application on spark standalone cluster we see some 
java.lang.ClassNotFoundException exceptions in the log.


Application Overview:


Our domain model is simple POJO that represents RabbitMQ events we want to 
consume and contains some custom properties we are interested in: 
{code:title=Event.java|borderStyle=solid}
        class Event implements java.io.Externalizable {
    
            // custom properties

            // custom implementation of writeExternal(), readExternal() methods
        }    
{code}
        We have implemented custom spark receiver that just receives messages 
from RabbitMQ queue by means of custom consumer(See "Receiving messages by 
subscription" at https://www.rabbitmq.com/api-guide.html), converts them to our 
custom domain event objects(com.xxx.Event) and stores them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
            byte[] body = // data received from Rabbit using custom consumer
            Event event = new Event(body);
            store(event)  // store into Spark              
 {code}                

        The main program is simple, it just set up spark streaming context:
{code:title=SparkApplication.java|borderStyle=solid}
            SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
            
sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  
{code}
        Initialize input streams:
{code:title=SparkApplication.java|borderStyle=solid}            
            ReceiverInputDStream<Event> stream = // create input stream from 
RabbitMQ
            JavaReceiverInputDStream<Event> events = new 
JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}            
        Process events:
{code:title=SparkApplication.java|borderStyle=solid}            
            events.foreachRDD(
                    rdd -> {

                        rdd.foreachPartition(

                                partition -> {
 
                                        // process partition
                                }
                        }
                    })
                    
            ssc.start();
            ssc.awaitTermination();
{code}

Application submission:            
            
        Application is packaged into single fat jar using maven shade 
plugin(http://maven.apache.org/plugins/maven-shade-plugin/). It compiled with 
spark version 1.1.0           
        We run our application on spark version 1.1.0 standalone cluster that 
consists of driver host, master host and two worker hosts. We submit 
application from driver host.
        On one of the workers we see java.lang.ClassNotFoundException 
exceptions:  

        We see that worker has downloaded application.jar and added it to class 
loader:

14/11/27 10:26:59 INFO Executor: Fetching 
http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching 
http://xx.xx.xx.xx:38287/jars/application.jar to 
/tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding 
file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class 
loader

...........

14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:344)
        at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
        at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at 
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
        at 
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
        at 
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
        at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
        at 
org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
        at 
org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
        at 
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
        at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
        at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at 
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at 
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
        at 
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
        at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
        at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
        at 
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
        at 
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)



We manually checked jar file and it contains com.xxx.Event class  













> Spark Java Application : java.lang.ClassNotFoundException
> ---------------------------------------------------------
>
>                 Key: SPARK-4830
>                 URL: https://issues.apache.org/jira/browse/SPARK-4830
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Mykhaylo Telizhyn
>
> We have Spark Streaming application that consumes messages from RabbitMQ and 
> process them. When generating hundreds of events on RabbitMQ and running our 
> application on spark standalone cluster we see some 
> java.lang.ClassNotFoundException exceptions in the log.
> Application Overview:
> Our domain model is simple POJO that represents RabbitMQ events we want to 
> consume and contains some custom properties we are interested in: 
> {code:title=Event.java|borderStyle=solid}
>         class Event implements java.io.Externalizable {
>     
>             // custom properties
>             // custom implementation of writeExternal(), readExternal() 
> methods
>         }    
> {code}
>         We have implemented custom spark receiver that just receives messages 
> from RabbitMQ queue by means of custom consumer(See "Receiving messages by 
> subscription" at https://www.rabbitmq.com/api-guide.html), converts them to 
> our custom domain event objects(com.xxx.Event) and stores them on spark 
> memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
>             byte[] body = // data received from Rabbit using custom consumer
>             Event event = new Event(body);
>             store(event)  // store into Spark              
>  {code}                
>         The main program is simple, it just set up spark streaming context:
> {code:title=SparkApplication.java|borderStyle=solid}
>             SparkConf sparkConf = new 
> SparkConf().setAppName(APPLICATION_NAME);
>             
> sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  
> {code}
>         Initialize input streams:
> {code:title=SparkApplication.java|borderStyle=solid}            
>             ReceiverInputDStream<Event> stream = // create input stream from 
> RabbitMQ
>             JavaReceiverInputDStream<Event> events = new 
> JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}            
>         Process events:
> {code:title=SparkApplication.java|borderStyle=solid}            
>             events.foreachRDD(
>                     rdd -> {
>                         rdd.foreachPartition(
>                                 partition -> {
>  
>                                         // process partition
>                                 }
>                         }
>                     })
>                     
>             ssc.start();
>             ssc.awaitTermination();
> {code}
> Application submission:            
>             
>         Application is packaged into single fat jar using maven shade 
> plugin(http://maven.apache.org/plugins/maven-shade-plugin/). It compiled with 
> spark version 1.1.0           
>         We run our application on spark version 1.1.0 standalone cluster that 
> consists of driver host, master host and two worker hosts. We submit 
> application from driver host.
>         On one of the workers we see java.lang.ClassNotFoundException 
> exceptions:  
>         We see that worker has downloaded application.jar and added it to 
> class loader:
> 14/11/27 10:26:59 INFO Executor: Fetching 
> http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching 
> http://xx.xx.xx.xx:38287/jars/application.jar to 
> /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding 
> file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class 
> loader
> ...........
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>       at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:344)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>       at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>       at 
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
>       at 
> org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
>       at 
> org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
>       at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
>       at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
>       at 
> org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
>       at 
> org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
>       at 
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
>       at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
>       at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
>       at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
>       at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
>       at 
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
>       at 
> org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> We manually checked jar file and it contains com.xxx.Event class  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to