[ 
https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mykhaylo Telizhyn updated SPARK-4830:
-------------------------------------
    Description: 
h4. Application Overview:
  
       We have Spark Streaming application that consumes messages from RabbitMQ 
and processes them. When generating hundreds of events on RabbitMQ and running 
our application on spark standalone cluster we see some 
{{java.lang.ClassNotFoundException}} exceptions in the log. 

Our domain model is simple POJO that represents RabbitMQ events we want to 
consume and contains some custom properties we are interested in: 
{code:title=com.xxx.Event.java|borderStyle=solid}
        public class Event implements java.io.Externalizable {
    
            // custom properties

            // custom implementation of writeExternal(), readExternal() methods
        }    
{code}

        We have implemented custom Spark Streaming receiver that just receives 
messages from RabbitMQ queue by means of custom consumer (See _"Receiving 
messages by subscription"_ at https://www.rabbitmq.com/api-guide.html), 
converts them to our custom domain event objects ({{com.xxx.Event}}) and stores 
them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
            byte[] body = // data received from Rabbit using custom consumer
            Event event = new Event(body);
            store(event)  // store into Spark              
{code}                

        The main program is simple, it just set up spark streaming context:
{code:title=Application.java|borderStyle=solid}
            SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
            
sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  

            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(BATCH_DURATION_MS));
{code}

        Initialize input streams:
{code:title=Application.java|borderStyle=solid}            
            ReceiverInputDStream<Event> stream = // create input stream from 
RabbitMQ
            JavaReceiverInputDStream<Event> events = new 
JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}            

        Process events:
{code:title=Application.java|borderStyle=solid}            
            events.foreachRDD(
                    rdd -> {

                        rdd.foreachPartition(

                                partition -> {
 
                                        // process partition
                                }
                        }
                    })
                    
            ssc.start();
            ssc.awaitTermination();
{code}

h4. Application submission:            
            
        Application is packaged as a single fat jar file using maven shade 
plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled 
with spark version _1.1.0_           
        We run our application on spark version _1.1.0_ standalone cluster that 
consists of driver host, master host and two worker hosts. We submit 
application from driver host.
        
        On one of the workers we see {{java.lang.ClassNotFoundException}} 
exceptions:       
{panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:344)
    at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
    at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
    at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
    at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
    at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
    at 
org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
    at 
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
    at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
    at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at 
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at 
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
    at 
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
    at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
    at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
    at 
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
    at 
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
{panel}

    We see that worker has downloaded application.jar and added it to class 
loader:
{panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
14/11/27 10:26:59 INFO Executor: Fetching 
http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching 
http://xx.xx.xx.xx:38287/jars/application.jar to 
/tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding 
file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class 
loader
{panel}

  Also we manually checked jar file and it contains {{com.xxx.Event}} class  

  was:
We have Spark Streaming application that consumes messages from RabbitMQ and 
process them. When generating hundreds of events on RabbitMQ and running our 
application on spark standalone cluster we see some 
java.lang.ClassNotFoundException exceptions in the log.


Application Overview:


Our domain model is simple POJO that represents RabbitMQ events we want to 
consume and contains some custom properties we are interested in: 
{code:title=Event.java|borderStyle=solid}
        class Event implements java.io.Externalizable {
    
            // custom properties

            // custom implementation of writeExternal(), readExternal() methods
        }    
{code}
        We have implemented custom spark receiver that just receives messages 
from RabbitMQ queue by means of custom consumer(See "Receiving messages by 
subscription" at https://www.rabbitmq.com/api-guide.html), converts them to our 
custom domain event objects(com.xxx.Event) and stores them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
            byte[] body = // data received from Rabbit using custom consumer
            Event event = new Event(body);
            store(event)  // store into Spark              
 {code}                

        The main program is simple, it just set up spark streaming context:
{code:title=SparkApplication.java|borderStyle=solid}
            SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
            
sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  
{code}
        Initialize input streams:
{code:title=SparkApplication.java|borderStyle=solid}            
            ReceiverInputDStream<Event> stream = // create input stream from 
RabbitMQ
            JavaReceiverInputDStream<Event> events = new 
JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}            
        Process events:
{code:title=SparkApplication.java|borderStyle=solid}            
            events.foreachRDD(
                    rdd -> {

                        rdd.foreachPartition(

                                partition -> {
 
                                        // process partition
                                }
                        }
                    })
                    
            ssc.start();
            ssc.awaitTermination();
{code}

Application submission:            
            
        Application is packaged into single fat jar using maven shade 
plugin(http://maven.apache.org/plugins/maven-shade-plugin/). It compiled with 
spark version 1.1.0           
        We run our application on spark version 1.1.0 standalone cluster that 
consists of driver host, master host and two worker hosts. We submit 
application from driver host.
        On one of the workers we see java.lang.ClassNotFoundException 
exceptions:  

        We see that worker has downloaded application.jar and added it to class 
loader:

14/11/27 10:26:59 INFO Executor: Fetching 
http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching 
http://xx.xx.xx.xx:38287/jars/application.jar to 
/tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding 
file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class 
loader

...........

14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:344)
        at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
        at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at 
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
        at 
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
        at 
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
        at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
        at 
org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
        at 
org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
        at 
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
        at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
        at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at 
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at 
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
        at 
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
        at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
        at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
        at 
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
        at 
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)



We manually checked jar file and it contains com.xxx.Event class  














> Spark Java Application : java.lang.ClassNotFoundException
> ---------------------------------------------------------
>
>                 Key: SPARK-4830
>                 URL: https://issues.apache.org/jira/browse/SPARK-4830
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Mykhaylo Telizhyn
>
> h4. Application Overview:
>   
>        We have Spark Streaming application that consumes messages from 
> RabbitMQ and processes them. When generating hundreds of events on RabbitMQ 
> and running our application on spark standalone cluster we see some 
> {{java.lang.ClassNotFoundException}} exceptions in the log. 
> Our domain model is simple POJO that represents RabbitMQ events we want to 
> consume and contains some custom properties we are interested in: 
> {code:title=com.xxx.Event.java|borderStyle=solid}
>         public class Event implements java.io.Externalizable {
>     
>             // custom properties
>             // custom implementation of writeExternal(), readExternal() 
> methods
>         }    
> {code}
>         We have implemented custom Spark Streaming receiver that just 
> receives messages from RabbitMQ queue by means of custom consumer (See 
> _"Receiving messages by subscription"_ at 
> https://www.rabbitmq.com/api-guide.html), converts them to our custom domain 
> event objects ({{com.xxx.Event}}) and stores them on spark memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
>             byte[] body = // data received from Rabbit using custom consumer
>             Event event = new Event(body);
>             store(event)  // store into Spark              
> {code}                
>         The main program is simple, it just set up spark streaming context:
> {code:title=Application.java|borderStyle=solid}
>             SparkConf sparkConf = new 
> SparkConf().setAppName(APPLICATION_NAME);
>             
> sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  
>             JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
> new Duration(BATCH_DURATION_MS));
> {code}
>         Initialize input streams:
> {code:title=Application.java|borderStyle=solid}            
>             ReceiverInputDStream<Event> stream = // create input stream from 
> RabbitMQ
>             JavaReceiverInputDStream<Event> events = new 
> JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}            
>         Process events:
> {code:title=Application.java|borderStyle=solid}            
>             events.foreachRDD(
>                     rdd -> {
>                         rdd.foreachPartition(
>                                 partition -> {
>  
>                                         // process partition
>                                 }
>                         }
>                     })
>                     
>             ssc.start();
>             ssc.awaitTermination();
> {code}
> h4. Application submission:            
>             
>         Application is packaged as a single fat jar file using maven shade 
> plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled 
> with spark version _1.1.0_           
>         We run our application on spark version _1.1.0_ standalone cluster 
> that consists of driver host, master host and two worker hosts. We submit 
> application from driver host.
>         
>         On one of the workers we see {{java.lang.ClassNotFoundException}} 
> exceptions:       
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:344)
>     at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>     at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>     at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>     at 
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
>     at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
>     at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
>     at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
>     at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
>     at 
> org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
>     at 
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
>     at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
>     at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at 
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>     at 
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
>     at 
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
>     at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
>     at 
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
>     at 
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
>     at 
> org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {panel}
>     We see that worker has downloaded application.jar and added it to class 
> loader:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#ffffff}
> 14/11/27 10:26:59 INFO Executor: Fetching 
> http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching 
> http://xx.xx.xx.xx:38287/jars/application.jar to 
> /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding 
> file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class 
> loader
> {panel}
>   Also we manually checked jar file and it contains {{com.xxx.Event}} class  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to