[ 
https://issues.apache.org/jira/browse/SPARK-18531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saleem Ansari updated SPARK-18531:
----------------------------------
    Description: 
More details can be found here: 
https://gist.github.com/tuxdna/37a69b53e6f9a9442fa3b1d5e53c2acb



Spark FPGrowth algorithm croaks with a small dataset as show below. 


$ spark-shell --master "local[*]" --driver-memory 5g
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_102)
Spark context available as sc.
SQL context available as sqlContext.

scala> import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.mllib.fpm.FPGrowth

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> val data = sc.textFile("bug.data")
data: org.apache.spark.rdd.RDD[String] = bug.data MapPartitionsRDD[1] at 
textFile at <console>:31

scala> val transactions: RDD[Array[String]] = data.map(l => 
l.split(",").distinct)
transactions: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at 
map at <console>:33

scala> transactions.cache()
res0: transactions.type = MapPartitionsRDD[2] at map at <console>:33

scala> val fpg = new FPGrowth().setMinSupport(0.05).setNumPartitions(10)
fpg: org.apache.spark.mllib.fpm.FPGrowth = 
org.apache.spark.mllib.fpm.FPGrowth@66d62c59

scala> val model = fpg.run(transactions)
model: org.apache.spark.mllib.fpm.FPGrowthModel[String] = 
org.apache.spark.mllib.fpm.FPGrowthModel@6e92f150

scala> model.freqItemsets.take(1).foreach { i => i.items.mkString("[", ",", 
"]") + ", " + i.freq }
[Stage 3:>                                                          (0 + 2) / 
2]16/11/21 23:56:14 ERROR Executor: Managed memory leak detected; size = 
18068980 bytes, TID = 14
16/11/21 23:56:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 14)
java.lang.StackOverflowError
    at org.xerial.snappy.Snappy.arrayCopy(Snappy.java:84)
    at 
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:273)
    at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:115)
    at 
org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
    at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)



This failure is likely due to the size of baskets which contains over thousands 
of items.

scala> val maxBasketSize = transactions.map(_.length).max()
maxBasketSize: Int = 1171                                                       

scala> transactions.filter(_.length == maxBasketSize).collect()
res3: Array[Array[String]] = Array(Array(3858, 109, 5842, 2184, 2481, 534




  was:
More details can be found here: 
https://gist.github.com/tuxdna/37a69b53e6f9a9442fa3b1d5e53c2acb



Spark FPGrowth algorithm croaks with a small dataset as show below. 

{{
$ spark-shell --master "local[*]" --driver-memory 5g
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_102)
Spark context available as sc.
SQL context available as sqlContext.

scala> import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.mllib.fpm.FPGrowth

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> val data = sc.textFile("bug.data")
data: org.apache.spark.rdd.RDD[String] = bug.data MapPartitionsRDD[1] at 
textFile at <console>:31

scala> val transactions: RDD[Array[String]] = data.map(l => 
l.split(",").distinct)
transactions: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at 
map at <console>:33

scala> transactions.cache()
res0: transactions.type = MapPartitionsRDD[2] at map at <console>:33

scala> val fpg = new FPGrowth().setMinSupport(0.05).setNumPartitions(10)
fpg: org.apache.spark.mllib.fpm.FPGrowth = 
org.apache.spark.mllib.fpm.FPGrowth@66d62c59

scala> val model = fpg.run(transactions)
model: org.apache.spark.mllib.fpm.FPGrowthModel[String] = 
org.apache.spark.mllib.fpm.FPGrowthModel@6e92f150

scala> model.freqItemsets.take(1).foreach { i => i.items.mkString("[", ",", 
"]") + ", " + i.freq }
[Stage 3:>                                                          (0 + 2) / 
2]16/11/21 23:56:14 ERROR Executor: Managed memory leak detected; size = 
18068980 bytes, TID = 14
16/11/21 23:56:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 14)
java.lang.StackOverflowError
    at org.xerial.snappy.Snappy.arrayCopy(Snappy.java:84)
    at 
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:273)
    at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:115)
    at 
org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
    at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
}}

This failure is likely due to the size of basket which contains over thousands 
of items.

{{
scala> val maxBasketSize = transactions.map(_.length).max()
maxBasketSize: Int = 1171                                                       

scala> transactions.filter(_.length == maxBasketSize).collect()
res3: Array[Array[String]] = Array(Array(3858, 109, 5842, 2184, 2481, 534
}}





> Apache Spark FPGrowth algorithm implementation fails with 
> java.lang.StackOverflowError
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-18531
>                 URL: https://issues.apache.org/jira/browse/SPARK-18531
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.6.1
>            Reporter: Saleem Ansari
>
> More details can be found here: 
> https://gist.github.com/tuxdna/37a69b53e6f9a9442fa3b1d5e53c2acb
> Spark FPGrowth algorithm croaks with a small dataset as show below. 
> $ spark-shell --master "local[*]" --driver-memory 5g
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
>       /_/
> Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_102)
> Spark context available as sc.
> SQL context available as sqlContext.
> scala> import org.apache.spark.mllib.fpm.FPGrowth
> import org.apache.spark.mllib.fpm.FPGrowth
> scala> import org.apache.spark.rdd.RDD
> import org.apache.spark.rdd.RDD
> scala> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.SQLContext
> scala> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.{SparkConf, SparkContext}
> scala> val data = sc.textFile("bug.data")
> data: org.apache.spark.rdd.RDD[String] = bug.data MapPartitionsRDD[1] at 
> textFile at <console>:31
> scala> val transactions: RDD[Array[String]] = data.map(l => 
> l.split(",").distinct)
> transactions: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] 
> at map at <console>:33
> scala> transactions.cache()
> res0: transactions.type = MapPartitionsRDD[2] at map at <console>:33
> scala> val fpg = new FPGrowth().setMinSupport(0.05).setNumPartitions(10)
> fpg: org.apache.spark.mllib.fpm.FPGrowth = 
> org.apache.spark.mllib.fpm.FPGrowth@66d62c59
> scala> val model = fpg.run(transactions)
> model: org.apache.spark.mllib.fpm.FPGrowthModel[String] = 
> org.apache.spark.mllib.fpm.FPGrowthModel@6e92f150
> scala> model.freqItemsets.take(1).foreach { i => i.items.mkString("[", ",", 
> "]") + ", " + i.freq }
> [Stage 3:>                                                          (0 + 2) / 
> 2]16/11/21 23:56:14 ERROR Executor: Managed memory leak detected; size = 
> 18068980 bytes, TID = 14
> 16/11/21 23:56:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 14)
> java.lang.StackOverflowError
>     at org.xerial.snappy.Snappy.arrayCopy(Snappy.java:84)
>     at 
> org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:273)
>     at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:115)
>     at 
> org.apache.spark.io.SnappyOutputStreamWrapper.write(CompressionCodec.scala:202)
>     at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>     at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>     at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>     at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> This failure is likely due to the size of baskets which contains over 
> thousands of items.
> scala> val maxBasketSize = transactions.map(_.length).max()
> maxBasketSize: Int = 1171                                                     
>   
> scala> transactions.filter(_.length == maxBasketSize).collect()
> res3: Array[Array[String]] = Array(Array(3858, 109, 5842, 2184, 2481, 534



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to