[ https://issues.apache.org/jira/browse/SPARK-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen reopened SPARK-16223: ------------------------------- Actually I think this is a duplicate > Codegen failure with a Dataframe program using an array > ------------------------------------------------------- > > Key: SPARK-16223 > URL: https://issues.apache.org/jira/browse/SPARK-16223 > Project: Spark > Issue Type: Bug > Components: SQL > Reporter: Kazuaki Ishizaki > > When we compile a Dataframe program with an operation to large array, > compilation failure occurs. This is because a local variable > {{inputadapter_value}} cannot be referenced in {{apply()}} method that is > generated by {{CodegenContext.splitExpressions()}}. The local variable is > defined in {{processNext()}} method. > What is better approach to resolve this? Is it better to pass > {{inputadapter_value}} to {{apply()}} method? > Example program > {code} > val n = 500 > val statement = (0 to n - 1).map(i => s"value + 1.0d") > .mkString("Array(", ",", ")") > sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF > .selectExpr(statement).showString(1) > {code} > Generated code and stack trace > {code:java} > 23:10:45.801 ERROR > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to > compile: org.codehaus.commons.compiler.CompileException: File > 'generated.java', Line 30, Column 36: Expression "inputadapter_value" is not > an rvalue > /* 001 */ public Object generate(Object[] references) { > /* 002 */ return new GeneratedIterator(references); > /* 003 */ } > /* 004 */ > /* 005 */ final class GeneratedIterator extends > org.apache.spark.sql.execution.BufferedRowIterator { > /* 006 */ private Object[] references; > /* 007 */ private scala.collection.Iterator inputadapter_input; > /* 008 */ private Object[] project_values; > /* 009 */ private UnsafeRow project_result; > /* 010 */ private > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder; > /* 011 */ private > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter > project_rowWriter; > /* 012 */ private > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter > project_arrayWriter; > /* 013 */ > /* 014 */ public GeneratedIterator(Object[] references) { > /* 015 */ this.references = references; > /* 016 */ } > /* 017 */ > /* 018 */ public void init(int index, scala.collection.Iterator inputs[]) { > /* 019 */ partitionIndex = index; > /* 020 */ inputadapter_input = inputs[0]; > /* 021 */ this.project_values = null; > /* 022 */ project_result = new UnsafeRow(1); > /* 023 */ this.project_holder = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, > 32); > /* 024 */ this.project_rowWriter = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, > 1); > /* 025 */ this.project_arrayWriter = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); > /* 026 */ } > /* 027 */ > /* 028 */ private void project_apply_0(InternalRow inputadapter_row) { > /* 029 */ double project_value1 = -1.0; > /* 030 */ project_value1 = inputadapter_value + 1.0D; > /* 031 */ if (false) { > /* 032 */ project_values[0] = null; > /* 033 */ } else { > /* 034 */ project_values[0] = project_value1; > /* 035 */ } > /* 036 */ > /* 037 */ double project_value4 = -1.0; > /* 038 */ project_value4 = inputadapter_value + 1.0D; > /* 039 */ if (false) { > /* 040 */ project_values[1] = null; > /* 041 */ } else { > /* 042 */ project_values[1] = project_value4; > /* 043 */ } > ... > /* 4032 */ } > /* 4033 */ > /* 4034 */ protected void processNext() throws java.io.IOException { > /* 4035 */ while (inputadapter_input.hasNext()) { > /* 4036 */ InternalRow inputadapter_row = (InternalRow) > inputadapter_input.next(); > /* 4037 */ System.out.println("row: " + inputadapter_row.getClass() + > ", " + inputadapter_row); > /* 4038 */ double inputadapter_value = inputadapter_row.getDouble(0); > /* 4039 */ > /* 4040 */ final boolean project_isNull = false; > /* 4041 */ this.project_values = new > Object[500];project_apply_0(inputadapter_row); > /* 4042 */ project_apply_1(inputadapter_row); > /* 4043 */ /* final ArrayData project_value = > org.apache.spark.sql.catalyst.util.GenericArrayData.allocate(project_values); > */ > /* 4044 */ final ArrayData project_value = new > org.apache.spark.sql.catalyst.util.GenericArrayData(project_values); > /* 4045 */ this.project_values = null; > /* 4046 */ project_holder.reset(); > /* 4047 */ > /* 4048 */ project_rowWriter.zeroOutNullBytes(); > /* 4049 */ > /* 4050 */ if (project_isNull) { > /* 4051 */ project_rowWriter.setNullAt(0); > /* 4052 */ } else { > /* 4053 */ // Remember the current cursor so that we can calculate > how many bytes are > /* 4054 */ // written later. > /* 4055 */ final int project_tmpCursor = project_holder.cursor; > /* 4056 */ > /* 4057 */ if (project_value instanceof UnsafeArrayData) { > /* 4058 */ final int project_sizeInBytes = ((UnsafeArrayData) > project_value).getSizeInBytes(); > /* 4059 */ // grow the global buffer before writing data. > /* 4060 */ project_holder.grow(project_sizeInBytes); > /* 4061 */ ((UnsafeArrayData) > project_value).writeToMemory(project_holder.buffer, project_holder.cursor); > /* 4062 */ project_holder.cursor += project_sizeInBytes; > /* 4063 */ > /* 4064 */ } else { > /* 4065 */ final int project_numElements = > project_value.numElements(); > /* 4066 */ project_arrayWriter.initialize(project_holder, > project_numElements, 8); > /* 4067 */ > /* 4068 */ for (int project_index = 0; project_index < > project_numElements; project_index++) { > /* 4069 */ if (project_value.isNullAt(project_index)) { > /* 4070 */ project_arrayWriter.setNullAt(project_index); > /* 4071 */ } else { > /* 4072 */ final double project_element = > project_value.getDouble(project_index); > /* 4073 */ project_arrayWriter.write(project_index, > project_element); > /* 4074 */ } > /* 4075 */ } > /* 4076 */ } > /* 4077 */ > /* 4078 */ project_rowWriter.setOffsetAndSize(0, project_tmpCursor, > project_holder.cursor - project_tmpCursor); > /* 4079 */ project_rowWriter.alignToWords(project_holder.cursor - > project_tmpCursor); > /* 4080 */ } > /* 4081 */ project_result.setTotalSize(project_holder.totalSize()); > /* 4082 */ append(project_result); > /* 4083 */ if (shouldStop()) return; > /* 4084 */ } > /* 4085 */ } > /* 4086 */ } > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 30, Column 36: Expression "inputadapter_value" is not an rvalue > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10174) > at > org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:6036) > at > org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:4440) > at org.codehaus.janino.UnitCompiler.access$9900(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$11.visitAmbiguousName(UnitCompiler.java:4417) > at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3138) > at > org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:4427) > at > org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:4498) > at org.codehaus.janino.UnitCompiler.access$8900(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$11.visitBinaryOperation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$BinaryOperation.accept(Java.java:3768) > at > org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:4427) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4360) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2669) > at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619) > at org.codehaus.janino.Java$Assignment.accept(Java.java:3405) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643) > at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:822) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662) > at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383) > at > org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315) > at > org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:878) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:903) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:900) > at > com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) > at > com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) > at > com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) > at com.google.common.cache.LocalCache.get(LocalCache.java:4000) > at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004) > at > com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:832) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:351) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2176) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2525) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2175) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2182) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1918) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1917) > at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2555) > at org.apache.spark.sql.Dataset.head(Dataset.scala:1917) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2132) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) > at > org.apache.spark.sql.MySuite$$anonfun$1.apply$mcV$sp(MySuite.scala:254) > at org.apache.spark.sql.MySuite$$anonfun$1.apply(MySuite.scala:29) > at org.apache.spark.sql.MySuite$$anonfun$1.apply(MySuite.scala:29) > at > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) > at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > at org.scalatest.Transformer.apply(Transformer.scala:22) > at org.scalatest.Transformer.apply(Transformer.scala:20) > at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) > at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:57) > at > org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) > at > org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) > at > org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) > at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) > at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) > at > org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) > at > org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) > at > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) > at > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) > at scala.collection.immutable.List.foreach(List.scala:381) > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) > at > org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) > at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) > at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) > at org.scalatest.Suite$class.run(Suite.scala:1424) > at > org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) > at > org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) > at > org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) > at org.scalatest.SuperEngine.runImpl(Engine.scala:545) > at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) > at > org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:29) > at > org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) > at > org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) > at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:29) > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) > at scala.collection.immutable.List.foreach(List.scala:381) > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) > at > org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) > at > org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) > at org.scalatest.tools.Runner$.run(Runner.scala:883) > at org.scalatest.tools.Runner.run(Runner.scala) > at > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) > at > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org