[ 
https://issues.apache.org/jira/browse/SPARK-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davies Liu resolved SPARK-14138.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 2.0.0

Issue resolved by pull request 12108
[https://github.com/apache/spark/pull/12108]

> Generated SpecificColumnarIterator code can exceed JVM size limit for cached 
> DataFrames
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-14138
>                 URL: https://issues.apache.org/jira/browse/SPARK-14138
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Sven Krasser
>             Fix For: 2.0.0
>
>
> The generated {{SpecificColumnarIterator}} code for wide DataFrames can 
> exceed the JVM 64k limit under certain circumstances. This snippet reproduces 
> the error in spark-shell (with 5G driver memory) by creating a new DataFrame 
> with >2000 aggregation-based columns:
> {code}
> val df = sc.parallelize(1 to 10).toDF()
> val aggr = {1 to 2260}.map(colnum => avg(df.col("_1")).as(s"col_$colnum"))
> val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache()
> res.show() // this will break
> {code}
> The following error is produced (pruned for brevity):
> {noformat}
> /* 001 */
> /* 002 */ import java.nio.ByteBuffer;
> /* 003 */ import java.nio.ByteOrder;
> /* 004 */ import scala.collection.Iterator;
> /* 005 */ import org.apache.spark.sql.types.DataType;
> /* 006 */ import 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> /* 007 */ import 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> /* 008 */ import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> /* 009 */
> /* 010 */ public SpecificColumnarIterator 
> generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
> /* 011 */   return new SpecificColumnarIterator();
> /* 012 */ }
> /* 013 */
> ...
> /* 9113 */     accessor2261.extractTo(mutableRow, 2261);
> /* 9114 */     unsafeRow.pointTo(bufferHolder.buffer, 2262, 
> bufferHolder.totalSize());
> /* 9115 */     return unsafeRow;
> /* 9116 */   }
> /* 9117 */ }
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:555)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:575)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:572)
>       at 
> org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>       at 
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>       ... 28 more
> Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "()Z" 
> of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator"
>  grows beyond 64 KB
>       at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
>       at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
>       at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
>       at org.codehaus.janino.UnitCompiler.invoke(UnitCompiler.java:10050)
>       at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4008)
>       at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185)
>       at 
> org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263)
>       at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974)
>       at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
>       at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
>       at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3927)
>       at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185)
>       at 
> org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263)
>       at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974)
>       at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
>       at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
>       at 
> org.codehaus.janino.UnitCompiler.invokeConstructor(UnitCompiler.java:6681)
>       at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4126)
>       at org.codehaus.janino.UnitCompiler.access$7600(UnitCompiler.java:185)
>       at 
> org.codehaus.janino.UnitCompiler$10.visitNewClassInstance(UnitCompiler.java:3275)
>       at org.codehaus.janino.Java$NewClassInstance.accept(Java.java:4085)
>       at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
>       at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
>       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2669)
>       at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
>       at 
> org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
>       at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
>       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
>       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
>       at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
>       at 
> org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
>       at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
>       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
>       at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
>       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
>       at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:822)
>       at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
>       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
>       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
>       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
>       at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
>       at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
>       at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
>       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>       at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
>       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
>       at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
>       at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
>       at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
>       at 
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
>       at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>       at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
>       at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
>       at 
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
>       at 
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
>       at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
>       at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:550)
>       ... 32 more
> {noformat}
> Note that the issue does not occur (and the {{.show()}} call prints the right 
> results) when the number of aggregation columns is slightly reduced, 2250 
> instead of 2260 in this case:
> {code}
> val df = sc.parallelize(1 to 10).toDF()
> val aggr = {1 to 2250}.map(colnum => avg(df.col("_1")).as(s"col_$colnum")) // 
> only 2250
> val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache()
> res.show() // this will work
> {code}
> Also, if the final DataFrame is not cached, then it will also work for 2260 
> aggregations:
> {code}
> val df = sc.parallelize(1 to 10).toDF()
> val aggr = {1 to 2260}.map(colnum => avg(df.col("_1")).as(s"col_$colnum"))
> val res = df.groupBy("_1").agg(count("_1"), aggr: _*) // no .cache() call
> res.show() // this will work
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to