[
https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376335#comment-14376335
]
sam commented on SPARK-4830:
----------------------------
I've been seeing a similar problem but just for a regular Spark job (no
streaming). The executor logs also contain a storm of other exceptions (lots
of IO and connection exceptions), all seem unrelated to each other. My
suspicion is that this is caused by
https://issues.apache.org/jira/browse/SPARK-3768 (also read linked JIRAs) and
the storm of nonsense exceptions is because YARN is killing off the executors
using some heavy kill signals - the JVMs are mid process and cannot cope
gracefully.
A workaround appears to manually set spark.yarn.executor.memoryOverhead to
something much larger than the default (~400 MB). I'm pretty sure I've seen a
correspondence with the number of partitions I use and the chances of seeing
this kind of thing. Anyway I've had to set my overhead to 12GB, this seems a
little large right? Anything lower and I see a mess of exceptions in my worker
logs.
15/03/23 18:08:13 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Main$XxxXxx$3
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:59)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:437)
at
org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:359)
at
org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)
at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
at
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
at
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662)
at
org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
> Spark Streaming Java Application : java.lang.ClassNotFoundException
> -------------------------------------------------------------------
>
> Key: SPARK-4830
> URL: https://issues.apache.org/jira/browse/SPARK-4830
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.1.0
> Reporter: Mykhaylo Telizhyn
>
> h4. Application Overview:
>
> We have Spark Streaming application that consumes messages from
> RabbitMQ and processes them. When generating hundreds of events on RabbitMQ
> and running our application on spark standalone cluster we see some
> {{java.lang.ClassNotFoundException}} exceptions in the log.
> Our domain model is simple POJO that represents RabbitMQ events we want to
> consume and contains some custom properties we are interested in:
> {code:title=com.xxx.Event.java|borderStyle=solid}
> public class Event implements java.io.Externalizable {
>
> // custom properties
> // custom implementation of writeExternal(), readExternal()
> methods
> }
> {code}
> We have implemented custom Spark Streaming receiver that just
> receives messages from RabbitMQ queue by means of custom consumer (See
> _"Receiving messages by subscription"_ at
> https://www.rabbitmq.com/api-guide.html), converts them to our custom domain
> event objects ({{com.xxx.Event}}) and stores them on spark memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
> byte[] body = // data received from Rabbit using custom consumer
> Event event = new Event(body);
> store(event) // store into Spark
> {code}
> The main program is simple, it just set up spark streaming context:
> {code:title=Application.java|borderStyle=solid}
> SparkConf sparkConf = new
> SparkConf().setAppName(APPLICATION_NAME);
>
> sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> new Duration(BATCH_DURATION_MS));
> {code}
> Initialize input streams:
> {code:title=Application.java|borderStyle=solid}
> ReceiverInputDStream<Event> stream = // create input stream from
> RabbitMQ
> JavaReceiverInputDStream<Event> events = new
> JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}
> Process events:
> {code:title=Application.java|borderStyle=solid}
> events.foreachRDD(
> rdd -> {
> rdd.foreachPartition(
> partition -> {
>
> // process partition
> }
> }
> })
>
> ssc.start();
> ssc.awaitTermination();
> {code}
> h4. Application submission:
>
> Application is packaged as a single fat jar file using maven shade
> plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled
> with spark version _1.1.0_
> We run our application on spark version _1.1.0_ standalone cluster
> that consists of driver host, master host and two worker hosts. We submit
> application from driver host.
>
> On one of the workers we see {{java.lang.ClassNotFoundException}}
> exceptions:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:344)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
> at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
> at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
> at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
> at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
> at
> org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
> at
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> at
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at
> org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
> at
> org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {panel}
> We see that worker has downloaded application.jar and added it to class
> loader:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
> 14/11/27 10:26:59 INFO Executor: Fetching
> http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching
> http://xx.xx.xx.xx:38287/jars/application.jar to
> /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding
> file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class
> loader
> {panel}
> Also we manually checked jar file and it contains {{com.xxx.Event}} class
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]