Marcelo Vanzin created SPARK-17549:
--------------------------------------

             Summary: InMemoryRelation doesn't scale to large tables
                 Key: SPARK-17549
                 URL: https://issues.apache.org/jira/browse/SPARK-17549
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.0.0, 1.6.0
            Reporter: Marcelo Vanzin


An {{InMemoryRelation}} is created when you cache a table; but if the table is 
large, defined by either having a really large amount of columns, or a really 
large amount of partitions (in the file split sense, not the "table partition" 
sense), or both, it causes an immense amount of memory to be used in the driver.

The reason is that it uses an accumulator to collect statistics about each 
partition, and instead of summarizing the data in the driver, it keeps *all* 
entries in memory.

I'm attaching a script I used to create a parquet file with 20,000 columns and 
a single row, which I then copied 500 times so I'd have 500 partitions.

When doing the following:

{code}
sqlContext.read.parquet(...).count()
{code}

Everything works fine, both in Spark 1.6 and 2.0. (It's super slow with the 
settings I used, but it works.)

I ran spark-shell like this:

{code}
./bin/spark-shell --master 'local-cluster[4,1,4096]' --driver-memory 2g --conf 
spark.executor.memory=2g
{code}

And ran:

{code}
sqlContext.read.parquet(...).cache().count()
{code}

You'll see the results in screenshot {{example_1.6_pre_patch.png}}. After 40 
partitions were processed, there were 40 GenericInternalRow objects with
100,000 items each (5 stat info fields * 20,000 columns). So, memory usage was:

{code}
  40 * 100000 * (4 * 20 + 24) = 416000000 =~ 400MB
{code}

(Note: Integer = 20 bytes, Long = 24 bytes.)

If I waited until the end, there would be 500 partitions, so ~ 5GB of memory to 
hold the stats.


I'm also attaching a patch I made on top of 1.6 that uses just a long 
accumulator to capture the table size; with that patch memory usage on the 
driver doesn't keep growing. Also note in the patch that I'm multiplying the 
column size by the row count, which I think is a different bug in the existing 
code (those stats should be for the whole batch, not just a single row, 
right?). I also added {{example_1.6_post_patch.png}} to show the 
{{InMemoryRelation}} with the patch.


I also applied a very similar patch on top of Spark 2.0. But there things blow 
up even more spectacularly when I try to run the count on the cached table. It 
starts with this error:

{noformat}
14:19:43 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 
vanzin-st1-3.gce.cloudera.com): java.util.concurrent.ExecutionException: 
java.lang.Exception: failed to compile: java.lang.IndexOutOfBoundsException: 
Index: 63235, Size: 1
(lots of generated code here...)
Caused by: java.lang.IndexOutOfBoundsException: Index: 63235, Size: 1
        at java.util.ArrayList.rangeCheck(ArrayList.java:635)
        at java.util.ArrayList.get(ArrayList.java:411)
        at 
org.codehaus.janino.util.ClassFile.getConstantPoolInfo(ClassFile.java:556)
        at 
org.codehaus.janino.util.ClassFile.getConstantUtf8(ClassFile.java:572)
        at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1513)
        at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
        at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
        at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:913)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:911)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:911)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:883)
        ... 54 more
{noformat}

And basically a lot of that going on making the output unreadable, so I just 
killed the shell. Anyway, I believe the same fix should work there, but I can't 
be sure because the test doesn't work for different reasons, it seems.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to