[ 
https://issues.apache.org/jira/browse/SPARK-17549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15504950#comment-15504950
 ] 

Marcelo Vanzin commented on SPARK-17549:
----------------------------------------

[~yhuai]

There was something that was bothering me about my fix and the test below 
(modified version of the test in the patch) shows it:

{code}
  test("SPARK-17549: cached table size should be correctly calculated") {
    val data = spark.sparkContext.parallelize(1 to 10, 5).map { i => (i, i) 
}.toDF("col1", "col2")
    val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
    val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)

    // Materialize the data.
    val expectedAnswer = data.collect()
    checkAnswer(cached, expectedAnswer)

    // Check that the right size was calculated.
    assert(cached.batchStats.value === 2 * expectedAnswer.size * 
INT.defaultSize)

    // Create a projection of the cached data and make sure the statistics are 
kept correctly
    // for each plan.
    val projected = cached.withOutput(Seq(plan.output.head))
    val expectedAnswer2 = data.select("col1").collect()
    checkAnswer(projected, expectedAnswer2)
    assert(projected.batchStats.value === expectedAnswer.size * INT.defaultSize)
  }
{code}

Basically, my patch has a problem in that now any relations derived from the 
cached data (both {{newInstance}} and {{withOutput}} methods of 
{{InMemoryRelation}}) share the same accumulator with the original instance. So 
if the original table is materialized before those transformations occur, there 
will be a problem. I'm also not sure how to fix that - going back to the 
original code is bad; the other option I see is calculating the stats for the 
new instance by transforming the cached RDD, but that's kinda wasteful (will 
cause unnecessary tasks to be executed, and will waste cache memory by caching 
the same blocks twice).

But that assumes that I understand the problem properly; since I'm not that 
familiar with the SQL code, could you confirm whether the above is a problem or 
not?

> InMemoryRelation doesn't scale to large tables
> ----------------------------------------------
>
>                 Key: SPARK-17549
>                 URL: https://issues.apache.org/jira/browse/SPARK-17549
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 2.0.0
>            Reporter: Marcelo Vanzin
>            Assignee: Marcelo Vanzin
>             Fix For: 2.0.1, 2.1.0
>
>         Attachments: create_parquet.scala, example_1.6_post_patch.png, 
> example_1.6_pre_patch.png, spark-1.6-2.patch, spark-1.6.patch, spark-2.0.patch
>
>
> An {{InMemoryRelation}} is created when you cache a table; but if the table 
> is large, defined by either having a really large amount of columns, or a 
> really large amount of partitions (in the file split sense, not the "table 
> partition" sense), or both, it causes an immense amount of memory to be used 
> in the driver.
> The reason is that it uses an accumulator to collect statistics about each 
> partition, and instead of summarizing the data in the driver, it keeps *all* 
> entries in memory.
> I'm attaching a script I used to create a parquet file with 20,000 columns 
> and a single row, which I then copied 500 times so I'd have 500 partitions.
> When doing the following:
> {code}
> sqlContext.read.parquet(...).count()
> {code}
> Everything works fine, both in Spark 1.6 and 2.0. (It's super slow with the 
> settings I used, but it works.)
> I ran spark-shell like this:
> {code}
> ./bin/spark-shell --master 'local-cluster[4,1,4096]' --driver-memory 2g 
> --conf spark.executor.memory=2g
> {code}
> And ran:
> {code}
> sqlContext.read.parquet(...).cache().count()
> {code}
> You'll see the results in screenshot {{example_1.6_pre_patch.png}}. After 40 
> partitions were processed, there were 40 GenericInternalRow objects with
> 100,000 items each (5 stat info fields * 20,000 columns). So, memory usage 
> was:
> {code}
>   40 * 100000 * (4 * 20 + 24) = 416000000 =~ 400MB
> {code}
> (Note: Integer = 20 bytes, Long = 24 bytes.)
> If I waited until the end, there would be 500 partitions, so ~ 5GB of memory 
> to hold the stats.
> I'm also attaching a patch I made on top of 1.6 that uses just a long 
> accumulator to capture the table size; with that patch memory usage on the 
> driver doesn't keep growing. Also note in the patch that I'm multiplying the 
> column size by the row count, which I think is a different bug in the 
> existing code (those stats should be for the whole batch, not just a single 
> row, right?). I also added {{example_1.6_post_patch.png}} to show the 
> {{InMemoryRelation}} with the patch.
> I also applied a very similar patch on top of Spark 2.0. But there things 
> blow up even more spectacularly when I try to run the count on the cached 
> table. It starts with this error:
> {noformat}
> 14:19:43 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 
> vanzin-st1-3.gce.cloudera.com): java.util.concurrent.ExecutionException: 
> java.lang.Exception: failed to compile: java.lang.IndexOutOfBoundsException: 
> Index: 63235, Size: 1
> (lots of generated code here...)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 63235, Size: 1
>       at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>       at java.util.ArrayList.get(ArrayList.java:411)
>       at 
> org.codehaus.janino.util.ClassFile.getConstantPoolInfo(ClassFile.java:556)
>       at 
> org.codehaus.janino.util.ClassFile.getConstantUtf8(ClassFile.java:572)
>       at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1513)
>       at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
>       at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
>       at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:913)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:911)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:911)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:883)
>       ... 54 more
> {noformat}
> And basically a lot of that going on making the output unreadable, so I just 
> killed the shell. Anyway, I believe the same fix should work there, but I 
> can't be sure because the test doesn't work for different reasons, it seems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to