Oh, Thanks for checking! On Tue, Feb 14, 2017 at 12:32 PM, Xiao Li <gatorsm...@gmail.com> wrote:
> https://github.com/apache/spark/pull/16894 > > Already backported to Spark 2.0 > > Thanks! > > Xiao > > 2017-02-13 17:41 GMT-08:00 Takeshi Yamamuro <linguin....@gmail.com>: > >> cc: xiao >> >> IIUC a xiao's commit below fixed this issue in master. >> https://github.com/apache/spark/commit/2eb093decb5e87a1ea71b >> baa28092876a8c84996 >> >> Is this fix worth backporting to the v2.0 branch? >> I checked I could reproduce there: >> >> --- >> >> scala> Seq((1, "a"), (2, "b"), (3, null)).toDF("c0", >> "c1").write.mode("overwrite").parquet("/Users/maropu/Desktop/data") >> scala> spark.read.parquet("/Users/maropu/Desktop/data").createOrRep >> laceTempView("t") >> scala> val df = sql("SELECT c0 FROM t WHERE NOT(c1 IS NOT NULL)") >> scala> df.explain(true) >> == Parsed Logical Plan == >> 'Project ['c0] >> +- 'Filter NOT isnotnull('c1) >> +- 'UnresolvedRelation `t` >> >> == Analyzed Logical Plan == >> c0: int >> Project [c0#16] >> +- Filter NOT isnotnull(c1#17) >> +- SubqueryAlias t >> +- Relation[c0#16,c1#17] parquet >> >> == Optimized Logical Plan == >> Project [c0#16] >> +- Filter (isnotnull(c1#17) && NOT isnotnull(c1#17)) >> ^^^^^^^^^^^^^^^^ >> +- Relation[c0#16,c1#17] parquet >> >> == Physical Plan == >> *Project [c0#16] >> +- *Filter (isnotnull(c1#17) && NOT isnotnull(c1#17)) >> +- *BatchedScan parquet [c0#16,c1#17] Format: ParquetFormat, >> InputPaths: file:/Users/maropu/Desktop/data, PartitionFilters: [], >> PushedFilters: [IsNotNull(c1), Not(IsNotNull(c1))], ReadSchema: >> struct<c0:int,c1:string> >> >> scala> df.show >> +---+ >> | c0| >> +---+ >> +---+ >> >> >> >> >> // maropu >> >> >> On Sun, Feb 12, 2017 at 10:01 AM, Everett Anderson < >> ever...@nuna.com.invalid> wrote: >> >>> On the plus side, looks like this may be fixed in 2.1.0: >>> >>> == Physical Plan == >>> *HashAggregate(keys=[], functions=[count(1)]) >>> +- Exchange SinglePartition >>> +- *HashAggregate(keys=[], functions=[partial_count(1)]) >>> +- *Project >>> +- *Filter NOT isnotnull(username#14) >>> +- *FileScan parquet [username#14] Batched: true, Format: >>> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], >>> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], >>> ReadSchema: struct<username:string> >>> >>> >>> >>> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com> >>> wrote: >>> >>>> Bumping this thread. >>>> >>>> Translating "where not(username is not null)" into a filter of >>>> [IsNotNull(username), >>>> Not(IsNotNull(username))] seems like a rather severe bug. >>>> >>>> Spark 1.6.2: >>>> >>>> explain select count(*) from parquet_table where not( username is not >>>> null) >>>> >>>> == Physical Plan == >>>> TungstenAggregate(key=[], >>>> functions=[(count(1),mode=Final,isDistinct=false)], >>>> output=[_c0#1822L]) >>>> +- TungstenExchange SinglePartition, None >>>> +- TungstenAggregate(key=[], >>>> functions=[(count(1),mode=Partial,isDistinct=false)], >>>> output=[count#1825L]) >>>> +- Project >>>> +- Filter NOT isnotnull(username#1590) >>>> +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, >>>> PushedFilters: [Not(IsNotNull(username))] >>>> >>>> Spark 2.0.2 >>>> >>>> explain select count(*) from parquet_table where not( username is not >>>> null) >>>> >>>> == Physical Plan == >>>> *HashAggregate(keys=[], functions=[count(1)]) >>>> +- Exchange SinglePartition >>>> +- *HashAggregate(keys=[], functions=[partial_count(1)]) >>>> +- *Project >>>> +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35)) >>>> +- *BatchedScan parquet default.<hive table name>[username#35] Format: >>>> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [], >>>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], >>>> ReadSchema: struct<username:string> >>>> >>>> Example to generate the above: >>>> >>>> // Create some fake data >>>> >>>> import org.apache.spark.sql.Row >>>> import org.apache.spark.sql.Dataset >>>> import org.apache.spark.sql.types._ >>>> >>>> val rowsRDD = sc.parallelize(Seq( >>>> Row(1, "fred"), >>>> Row(2, "amy"), >>>> Row(3, null))) >>>> >>>> val schema = StructType(Seq( >>>> StructField("id", IntegerType, nullable = true), >>>> StructField("username", StringType, nullable = true))) >>>> >>>> val data = sqlContext.createDataFrame(rowsRDD, schema) >>>> >>>> val path = "SOME PATH HERE" >>>> >>>> data.write.mode("overwrite").parquet(path) >>>> >>>> val testData = sqlContext.read.parquet(path) >>>> >>>> testData.registerTempTable("filter_test_table") >>>> >>>> >>>> %sql >>>> explain select count(*) from filter_test_table where not( username is >>>> not null) >>>> >>>> >>>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas < >>>> akosti...@nuna.com.invalid> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have an application where I’m filtering data with SparkSQL with >>>>> simple WHERE clauses. I also want the ability to show the unmatched rows >>>>> for any filter, and so am wrapping the previous clause in `NOT()` to get >>>>> the inverse. Example: >>>>> >>>>> Filter: username is not null >>>>> Inverse filter: NOT(username is not null) >>>>> >>>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the >>>>> inverse filter always returns zero results. It looks like this is a >>>>> problem >>>>> with how the filter is getting pushed down to Parquet. Specifically, the >>>>> pushdown includes both the “is not null” filter, AND “not(is not null)”, >>>>> which would obviously result in zero matches. An example below: >>>>> >>>>> pyspark: >>>>> > x = spark.sql('select my_id from my_table where *username is not >>>>> null*') >>>>> > y = spark.sql('select my_id from my_table where not(*username is >>>>> not null*)') >>>>> >>>>> > x.explain() >>>>> == Physical Plan == >>>>> *Project [my_id#6L] >>>>> +- *Filter isnotnull(username#91) >>>>> +- *BatchedScan parquet default.my_table[my_id#6L,username#91] >>>>> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet, >>>>> PartitionFilters: [], PushedFilters: [IsNotNull(username)], >>>>> ReadSchema: struct<my_id:bigint,username:string> >>>>> [1159]> y.explain() >>>>> == Physical Plan == >>>>> *Project [my_id#6L] >>>>> +- *Filter (isnotnull(username#91) && NOT >>>>> isnotnull(username#91))username >>>>> +- *BatchedScan parquet default.my_table[my_id#6L,username#91] >>>>> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet, >>>>> PartitionFilters: [], >>>>> PushedFilters: [IsNotNull(username), >>>>> Not(IsNotNull(username))],username >>>>> ReadSchema: struct<my_id:bigint,username:string> >>>>> >>>>> Presently I’m working around this by using the new functionality of >>>>> NOT EXISTS in Spark 2, but that seems like overkill. >>>>> >>>>> Any help appreciated. >>>>> >>>>> >>>>> *Alexi Kostibas*Engineering >>>>> *Nuna* >>>>> 650 Townsend Street, Suite 425 >>>>> San Francisco, CA 94103 >>>>> >>>>> >>>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > -- --- Takeshi Yamamuro