-dev +user

It surprises me as `filter()` takes a Column, not a `Row => Boolean`.


There are several overloaded versions of Dataset.filter(...)

def filter(func: FilterFunction[T]): Dataset[T]
def filter(func: (T) ⇒ Boolean): Dataset[T]
def filter(conditionExpr: String): Dataset[T]
def filter(condition: Column): Dataset[T]

... and why the error occurs.  Can someone explain please?


Anytime the compiler fails like that, it is probably a Spark code
generation bug.  It would be awesome if you could try your application on
Spark 2.0.1 (currently voting on RC3) and see if its fixed.  If not, please
open a JIRA.

Michael

On Thu, Sep 29, 2016 at 9:16 AM, Samy Dindane <s...@dindane.com> wrote:

> Hi,
>
> I noticed that the following code compiles:
>
>   val df = spark.read.format("com.databricks.spark.avro").load("/tmp/
> whatever/output")
>   val count = df.filter(x => x.getAs[Int]("day") == 2).count
>
> It surprises me as `filter()` takes a Column, not a `Row => Boolean`.
>
> Also, this code returns the right result, but takes 1m30 to run (while it
> takes less than 1 second when using `$"day" === 2`) and gives the error
> pasted in the bottom of this message.
>
> I was just wondering why it does work (implicit conversion?), why it is
> slow, and why the error occurs.
> Can someone explain please?
>
> Thank you,
>
> Samy
>
> --
>
> [error] org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 398, Column 41: Expression "scan_isNull10" is not an
> rvalue
> [error]         at org.codehaus.janino.UnitCompil
> er.compileError(UnitCompiler.java:10174)
> [error]         at org.codehaus.janino.UnitCompil
> er.toRvalueOrCompileException(UnitCompiler.java:6036)
> [error]         at org.codehaus.janino.UnitCompil
> er.getConstantValue2(UnitCompiler.java:4440)
> [error]         at org.codehaus.janino.UnitCompil
> er.access$9900(UnitCompiler.java:185)
> [error]         at org.codehaus.janino.UnitCompil
> er$11.visitAmbiguousName(UnitCompiler.java:4417)
> [error]         at org.codehaus.janino.Java$Ambig
> uousName.accept(Java.java:3138)
> [error]         at org.codehaus.janino.UnitCompil
> er.getConstantValue(UnitCompiler.java:4427)
> [error]         at org.codehaus.janino.UnitCompil
> er.getConstantValue2(UnitCompiler.java:4634)
> [error]         at org.codehaus.janino.UnitCompil
> er.access$8900(UnitCompiler.java:185)
> [error]         at org.codehaus.janino.UnitCompil
> er$11.visitBinaryOperation(UnitCompiler.java:4394)
> [error]         at org.codehaus.janino.Java$Binar
> yOperation.accept(Java.java:3768)
> [error]         at org.codehaus.janino.UnitCompil
> er.getConstantValue(UnitCompiler.java:4427)
> [error]         at org.codehaus.janino.UnitCompil
> er.compileGetValue(UnitCompiler.java:4360)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:1845)
> [error]         at org.codehaus.janino.UnitCompil
> er.access$2000(UnitCompiler.java:185)
> [error]         at org.codehaus.janino.UnitCompil
> er$4.visitLocalVariableDeclarationStatement(UnitCompiler.java:945)
> [error]         at org.codehaus.janino.Java$Local
> VariableDeclarationStatement.accept(Java.java:2508)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:958)
> [error]         at org.codehaus.janino.UnitCompil
> er.compileStatements(UnitCompiler.java:1007)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:2293)
> [error]         at org.codehaus.janino.UnitCompil
> er.compileDeclaredMethods(UnitCompiler.java:822)
> [error]         at org.codehaus.janino.UnitCompil
> er.compileDeclaredMethods(UnitCompiler.java:794)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:507)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:658)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:662)
> [error]         at org.codehaus.janino.UnitCompil
> er.access$600(UnitCompiler.java:185)
> [error]         at org.codehaus.janino.UnitCompil
> er$2.visitMemberClassDeclaration(UnitCompiler.java:350)
> [error]         at org.codehaus.janino.Java$Membe
> rClassDeclaration.accept(Java.java:1035)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:354)
> [error]         at org.codehaus.janino.UnitCompil
> er.compileDeclaredMemberTypes(UnitCompiler.java:769)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:532)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:393)
> [error]         at org.codehaus.janino.UnitCompil
> er.access$400(UnitCompiler.java:185)
> [error]         at org.codehaus.janino.UnitCompil
> er$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
> [error]         at org.codehaus.janino.Java$Packa
> geMemberClassDeclaration.accept(Java.java:1139)
> [error]         at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:354)
> [error]         at org.codehaus.janino.UnitCompil
> er.compileUnit(UnitCompiler.java:322)
> [error]         at org.codehaus.janino.SimpleComp
> iler.compileToClassLoader(SimpleCompiler.java:383)
> [error]         at org.codehaus.janino.ClassBodyE
> valuator.compileToClass(ClassBodyEvaluator.java:315)
> [error]         at org.codehaus.janino.ClassBodyE
> valuator.cook(ClassBodyEvaluator.java:233)
> [error]         at org.codehaus.janino.SimpleComp
> iler.cook(SimpleCompiler.java:192)
> [error]         at org.codehaus.commons.compiler.
> Cookable.cook(Cookable.java:84)
> [error]         at org.apache.spark.sql.catalyst.
> expressions.codegen.CodeGenerator$.org$apache$spark$sql$
> catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGe
> nerator.scala:883)
> [error]         at org.apache.spark.sql.catalyst.
> expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941)
> [error]         at org.apache.spark.sql.catalyst.
> expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938)
> [error]         at org.spark_project.guava.cache.
> LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> [error]         at org.spark_project.guava.cache.
> LocalCache$Segment.loadSync(LocalCache.java:2379)
> [error]         at org.spark_project.guava.cache.
> LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
> [error]         at org.spark_project.guava.cache.
> LocalCache$Segment.get(LocalCache.java:2257)
> [error]         at org.spark_project.guava.cache.
> LocalCache.get(LocalCache.java:4000)
> [error]         at org.spark_project.guava.cache.
> LocalCache.getOrLoad(LocalCache.java:4004)
> [error]         at org.spark_project.guava.cache.
> LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> [error]         at org.apache.spark.sql.catalyst.
> expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:837)
> [error]         at org.apache.spark.sql.execution
> .WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:350)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> [error]         at org.apache.spark.rdd.RDDOperat
> ionScope$.withScope(RDDOperationScope.scala:151)
> [error]         at org.apache.spark.sql.execution.SparkPlan.executeQuery(
> SparkPlan.scala:133)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan.execute(SparkPlan.scala:114)
> [error]         at org.apache.spark.sql.execution
> .exchange.ShuffleExchange.prepareShuffleDependency(ShuffleEx
> change.scala:86)
> [error]         at org.apache.spark.sql.execution
> .exchange.ShuffleExchange$$anonfun$doExecute$1.apply(Shuf
> fleExchange.scala:122)
> [error]         at org.apache.spark.sql.execution
> .exchange.ShuffleExchange$$anonfun$doExecute$1.apply(Shuf
> fleExchange.scala:113)
> [error]         at org.apache.spark.sql.catalyst.
> errors.package$.attachTree(package.scala:49)
> [error]         at org.apache.spark.sql.execution
> .exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> [error]         at org.apache.spark.rdd.RDDOperat
> ionScope$.withScope(RDDOperationScope.scala:151)
> [error]         at org.apache.spark.sql.execution.SparkPlan.executeQuery(
> SparkPlan.scala:133)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan.execute(SparkPlan.scala:114)
> [error]         at org.apache.spark.sql.execution
> .InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
> [error]         at org.apache.spark.sql.execution
> .aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:138)
> [error]         at org.apache.spark.sql.execution
> .WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> [error]         at org.apache.spark.rdd.RDDOperat
> ionScope$.withScope(RDDOperationScope.scala:151)
> [error]         at org.apache.spark.sql.execution.SparkPlan.executeQuery(
> SparkPlan.scala:133)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan.execute(SparkPlan.scala:114)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
> [error]         at org.apache.spark.sql.execution
> .SparkPlan.executeCollect(SparkPlan.scala:287)
> [error]         at org.apache.spark.sql.Dataset$$
> anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Data
> set.scala:2183)
> [error]         at org.apache.spark.sql.execution
> .SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> [error]         at org.apache.spark.sql.Dataset.w
> ithNewExecutionId(Dataset.scala:2532)
> [error]         at org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
> [error]         at org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
> [error]         at org.apache.spark.sql.Dataset$$
> anonfun$count$1.apply(Dataset.scala:2217)
> [error]         at org.apache.spark.sql.Dataset$$
> anonfun$count$1.apply(Dataset.scala:2216)
> [error]         at org.apache.spark.sql.Dataset.w
> ithCallback(Dataset.scala:2545)
> [error]         at org.apache.spark.sql.Dataset.count(Dataset.scala:2216)
> [error]         at com.sam4m.kafkafsconnector.Foo
> $.delayedEndpoint$com$sam4m$kafkafsconnector$Foo$1(App.scala:92)
> [error]         at com.sam4m.kafkafsconnector.Foo
> $delayedInit$body.apply(App.scala:80)
> [error]         at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> [error]         at scala.runtime.AbstractFunction0.apply$mcV$sp(
> AbstractFunction0.scala:12)
> [error]         at scala.App$$anonfun$main$1.apply(App.scala:76)
> [error]         at scala.App$$anonfun$main$1.apply(App.scala:76)
> [error]         at scala.collection.immutable.List.foreach(List.scala:381)
> [error]         at scala.collection.generic.Trave
> rsableForwarder$class.foreach(TraversableForwarder.scala:35)
> [error]         at scala.App$class.main(App.scala:76)
> [error]         at com.sam4m.kafkafsconnector.Foo$.main(App.scala:80)
> [error]         at com.sam4m.kafkafsconnector.Foo.main(App.scala)
> [error] 16/09/29 17:49:49 WARN WholeStageCodegenExec: Whole-stage codegen
> disabled for this plan:
> [error]  *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#41L])
> [error] +- *Project
> [error]    +- *Filter <function1>.apply
> [error]       +- *Scan avro [minute#0,second#1,info#2,stat
> us#3,year#4,month#5,day#6,hour#7] Format: com.databricks.spark.avro.Defa
> ultSource@5864e8bf, InputPaths: file:/tmp/k2d-tests/output,
> PushedFilters: [], ReadSchema: struct<minute:int,second:int,i
> nfo:struct<date:bigint,statID:string,eventType:string,deviceAdverti...
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to