[ 
https://issues.apache.org/jira/browse/SPARK-17549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505411#comment-15505411
 ] 

Yin Huai commented on SPARK-17549:
----------------------------------

Forgot to say. Thank you for the investigation! Should we first get a test in 
to prevent the issue of having wrong stats?

> InMemoryRelation doesn't scale to large tables
> ----------------------------------------------
>
>                 Key: SPARK-17549
>                 URL: https://issues.apache.org/jira/browse/SPARK-17549
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 2.0.0
>            Reporter: Marcelo Vanzin
>            Assignee: Marcelo Vanzin
>             Fix For: 2.0.1, 2.1.0
>
>         Attachments: create_parquet.scala, example_1.6_post_patch.png, 
> example_1.6_pre_patch.png, spark-1.6-2.patch, spark-1.6.patch, spark-2.0.patch
>
>
> An {{InMemoryRelation}} is created when you cache a table; but if the table 
> is large, defined by either having a really large amount of columns, or a 
> really large amount of partitions (in the file split sense, not the "table 
> partition" sense), or both, it causes an immense amount of memory to be used 
> in the driver.
> The reason is that it uses an accumulator to collect statistics about each 
> partition, and instead of summarizing the data in the driver, it keeps *all* 
> entries in memory.
> I'm attaching a script I used to create a parquet file with 20,000 columns 
> and a single row, which I then copied 500 times so I'd have 500 partitions.
> When doing the following:
> {code}
> sqlContext.read.parquet(...).count()
> {code}
> Everything works fine, both in Spark 1.6 and 2.0. (It's super slow with the 
> settings I used, but it works.)
> I ran spark-shell like this:
> {code}
> ./bin/spark-shell --master 'local-cluster[4,1,4096]' --driver-memory 2g 
> --conf spark.executor.memory=2g
> {code}
> And ran:
> {code}
> sqlContext.read.parquet(...).cache().count()
> {code}
> You'll see the results in screenshot {{example_1.6_pre_patch.png}}. After 40 
> partitions were processed, there were 40 GenericInternalRow objects with
> 100,000 items each (5 stat info fields * 20,000 columns). So, memory usage 
> was:
> {code}
>   40 * 100000 * (4 * 20 + 24) = 416000000 =~ 400MB
> {code}
> (Note: Integer = 20 bytes, Long = 24 bytes.)
> If I waited until the end, there would be 500 partitions, so ~ 5GB of memory 
> to hold the stats.
> I'm also attaching a patch I made on top of 1.6 that uses just a long 
> accumulator to capture the table size; with that patch memory usage on the 
> driver doesn't keep growing. Also note in the patch that I'm multiplying the 
> column size by the row count, which I think is a different bug in the 
> existing code (those stats should be for the whole batch, not just a single 
> row, right?). I also added {{example_1.6_post_patch.png}} to show the 
> {{InMemoryRelation}} with the patch.
> I also applied a very similar patch on top of Spark 2.0. But there things 
> blow up even more spectacularly when I try to run the count on the cached 
> table. It starts with this error:
> {noformat}
> 14:19:43 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 
> vanzin-st1-3.gce.cloudera.com): java.util.concurrent.ExecutionException: 
> java.lang.Exception: failed to compile: java.lang.IndexOutOfBoundsException: 
> Index: 63235, Size: 1
> (lots of generated code here...)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 63235, Size: 1
>       at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>       at java.util.ArrayList.get(ArrayList.java:411)
>       at 
> org.codehaus.janino.util.ClassFile.getConstantPoolInfo(ClassFile.java:556)
>       at 
> org.codehaus.janino.util.ClassFile.getConstantUtf8(ClassFile.java:572)
>       at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1513)
>       at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
>       at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
>       at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:913)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:911)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:911)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:883)
>       ... 54 more
> {noformat}
> And basically a lot of that going on making the output unreadable, so I just 
> killed the shell. Anyway, I believe the same fix should work there, but I 
> can't be sure because the test doesn't work for different reasons, it seems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to