Hi everyone, I am using val messages = KafkaUtils.createDirectStream[String, 
String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) to read data 
from kafka(1k/second), and store the data in windows,the code snippets as 
follow:        val windowedStreamChannel = 
streamChannel.combineByKey[TreeSet[Obj]](TreeSet[Obj](_), _ += _, _ ++= _, new 
HashPartitioner(numPartition))
          .reduceByKeyAndWindow((x: TreeSet[Obj], y: TreeSet[Obj]) => x ++= y,
            (x: TreeSet[Obj], y: TreeSet[Obj]) => x --= y, Minutes(60), 
Seconds(2), numPartition,
            (item: (String, TreeSet[Obj])) => item._2.size != 0)after the 
application  run for an hour,  I kill the application and restart it from 
checkpoint directory, but I  encountered an exception:2015-04-27 17:52:40,955 
INFO  [Driver] - Slicing from 1430126222000 ms to 1430126222000 ms (aligned to 
1430126222000 ms and 1430126222000 ms)
2015-04-27 17:52:40,958 ERROR [Driver] - User class threw exception: null
java.lang.StackOverflowError
        at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
        at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
        at java.io.File.exists(File.java:813)
        at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1080)
        at sun.misc.URLClassPath.getResource(URLClassPath.java:199)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:358)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:190)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
        at org.apache.spark.rdd.RDD.filter(RDD.scala:303)
        at 
org.apache.spark.streaming.dstream.FilteredDStream$$anonfun$compute$1.apply(FilteredDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.FilteredDStream$$anonfun$compute$1.apply(FilteredDStream.scala:35)
        at scala.Option.map(Option.scala:145)
        at 
org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at 
org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at 
org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at 
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at 
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$slice$4.apply(DStream.scala:778)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$slice$4.apply(DStream.scala:777)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:777)
        at 
org.apache.spark.streaming.dstream.ReducedWindowedDStream.compute(ReducedWindowedDStream.scala:116)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at 
org.apache.spark.streaming.dstream.ReducedWindowedDStream.compute(ReducedWindowedDStream.scala:121)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)
        at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
        at 
org.apache.spark.streaming.dstream.ReducedWindowedDStream.compute(ReducedWindowedDStream.scala:121)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
        at scala.Option.orElse(Option.scala:257)        (many log like above)

the full log is in 
https://gist.githubusercontent.com/397090770/bb53fb2bf01447cefc2e/raw/gistfile1.txt



Reply via email to