[ 
https://issues.apache.org/jira/browse/SPARK-19141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174685#comment-16174685
 ] 

Weichen Xu edited comment on SPARK-19141 at 9/27/17 12:12 AM:
--------------------------------------------------------------

Maybe we need design a sparse format of AttributeGroup for vector ML column. We 
don't need create Attribute for each vector dimension. The better way I think 
is only when needed we create it. But `VectorAssembler` create attribute for 
each dimension, in any case. 


was (Author: weichenxu123):
Maybe we need design a sparse format of AttributeGroup for vector ML column. We 
don't need create Attribute for each vector dimension. The better way I think 
is only when needed we create it. But `VectorAssembler` create attribute for 
each dimension, in any case. Current design looks stupid.


> VectorAssembler metadata causing memory issues
> ----------------------------------------------
>
>                 Key: SPARK-19141
>                 URL: https://issues.apache.org/jira/browse/SPARK-19141
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, MLlib
>    Affects Versions: 1.6.0, 2.0.0, 2.1.0
>         Environment: Windows 10, Ubuntu 16.04.1, Scala 2.11.8, Spark 1.6.0, 
> 2.0.0, 2.1.0
>            Reporter: Antonia Oprescu
>
> VectorAssembler produces unnecessary metadata that overflows the Java heap in 
> the case of sparse vectors. In the example below, the logical length of the 
> vector is 10^6, but the number of non-zero values is only 2.
> The problem arises when the vector assembler creates metadata (ML attributes) 
> for each of the 10^6 slots, even if this metadata didn't exist upstream (i.e. 
> HashingTF doesn't produce metadata per slot). Here is a chunk of metadata it 
> produces:
> {noformat}
> {"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"HashedFeat_0"},{"idx":1,"name":"HashedFeat_1"},{"idx":2,"name":"HashedFeat_2"},{"idx":3,"name":"HashedFeat_3"},{"idx":4,"name":"HashedFeat_4"},{"idx":5,"name":"HashedFeat_5"},{"idx":6,"name":"HashedFeat_6"},{"idx":7,"name":"HashedFeat_7"},{"idx":8,"name":"HashedFeat_8"},{"idx":9,"name":"HashedFeat_9"},...,{"idx":1000000,"name":"Feat01"}]},"num_attrs":1000001}}
> {noformat}
> In this lightweight example, the feature size limit seems to be 1,000,000 
> when run locally, but this scales poorly with more complicated routines. With 
> a larger dataset and a learner (say LogisticRegression), it maxes out 
> anywhere between 10k and 100k hash size even on a decent sized cluster.
> I did some digging, and it seems that the only metadata necessary for 
> downstream learners is the one indicating categorical columns. Thus, I 
> thought of the following possible solutions:
> 1. Compact representation of ml attributes metadata (but this seems to be a 
> bigger change)
> 2. Removal of non-categorical tags from the metadata created by the 
> VectorAssembler
> 3. An option on the existent VectorAssembler to skip unnecessary ml 
> attributes or create another transformer altogether
> I would happy to take a stab at any of these solutions, but I need some 
> direction from the Spark community.
> {code:title=VABug.scala |borderStyle=solid}
> import org.apache.spark.SparkConf
> import org.apache.spark.ml.feature.{HashingTF, VectorAssembler}
> import org.apache.spark.sql.SparkSession
> object VARepro {
>   case class Record(Label: Double, Feat01: Double, Feat02: Array[String])
>   def main(args: Array[String]) {
>     val conf = new SparkConf()
>       .setAppName("Vector assembler bug")
>       .setMaster("local[*]")
>     val spark = SparkSession.builder.config(conf).getOrCreate()
>     import spark.implicits._
>     val df = Seq(Record(1.0, 2.0, Array("4daf")), Record(0.0, 3.0, 
> Array("a9ee"))).toDS()
>     val numFeatures = 10000000
>     val hashingScheme = new 
> HashingTF().setInputCol("Feat02").setOutputCol("HashedFeat").setNumFeatures(numFeatures)
>     val hashedData = hashingScheme.transform(df)
>     val vectorAssembler = new 
> VectorAssembler().setInputCols(Array("HashedFeat","Feat01")).setOutputCol("Features")
>     val processedData = vectorAssembler.transform(hashedData).select("Label", 
> "Features")
>     processedData.show()
>   }
> }
> {code}
> *Stacktrace from the example above:*
> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit 
> exceeded
>       at 
> org.apache.spark.ml.attribute.NumericAttribute.copy(attributes.scala:272)
>       at 
> org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:215)
>       at 
> org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:195)
>       at 
> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:71)
>       at 
> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:70)
>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>       at 
> scala.collection.IterableLike$class.copyToArray(IterableLike.scala:254)
>       at 
> scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
>       at 
> scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
>       at 
> scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
>       at 
> scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
>       at 
> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
>       at 
> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
>       at scala.Option.map(Option.scala:146)
>       at 
> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70)
>       at 
> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65)
>       at 
> org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:95)
>       at VARepro$.main(VARepro.scala:36)
> *Exception when run in conjuction with a learner on a bigger dataset (~10Gb) 
> on a cluster.*
> : java.lang.OutOfMemoryError: Java heap space
>       at java.util.Arrays.copyOf(Arrays.java:3236)
>       at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>       at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>       at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>       at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>       at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to