https://github.com/apache/spark/pull/16894
Already backported to Spark 2.0 Thanks! Xiao 2017-02-13 17:41 GMT-08:00 Takeshi Yamamuro <linguin....@gmail.com>: > cc: xiao > > IIUC a xiao's commit below fixed this issue in master. > https://github.com/apache/spark/commit/2eb093decb5e87a1ea71bbaa280928 > 76a8c84996 > > Is this fix worth backporting to the v2.0 branch? > I checked I could reproduce there: > > --- > > scala> Seq((1, "a"), (2, "b"), (3, null)).toDF("c0", > "c1").write.mode("overwrite").parquet("/Users/maropu/Desktop/data") > scala> spark.read.parquet("/Users/maropu/Desktop/data"). > createOrReplaceTempView("t") > scala> val df = sql("SELECT c0 FROM t WHERE NOT(c1 IS NOT NULL)") > scala> df.explain(true) > == Parsed Logical Plan == > 'Project ['c0] > +- 'Filter NOT isnotnull('c1) > +- 'UnresolvedRelation `t` > > == Analyzed Logical Plan == > c0: int > Project [c0#16] > +- Filter NOT isnotnull(c1#17) > +- SubqueryAlias t > +- Relation[c0#16,c1#17] parquet > > == Optimized Logical Plan == > Project [c0#16] > +- Filter (isnotnull(c1#17) && NOT isnotnull(c1#17)) > ^^^^^^^^^^^^^^^^ > +- Relation[c0#16,c1#17] parquet > > == Physical Plan == > *Project [c0#16] > +- *Filter (isnotnull(c1#17) && NOT isnotnull(c1#17)) > +- *BatchedScan parquet [c0#16,c1#17] Format: ParquetFormat, > InputPaths: file:/Users/maropu/Desktop/data, PartitionFilters: [], > PushedFilters: [IsNotNull(c1), Not(IsNotNull(c1))], ReadSchema: > struct<c0:int,c1:string> > > scala> df.show > +---+ > | c0| > +---+ > +---+ > > > > > // maropu > > > On Sun, Feb 12, 2017 at 10:01 AM, Everett Anderson < > ever...@nuna.com.invalid> wrote: > >> On the plus side, looks like this may be fixed in 2.1.0: >> >> == Physical Plan == >> *HashAggregate(keys=[], functions=[count(1)]) >> +- Exchange SinglePartition >> +- *HashAggregate(keys=[], functions=[partial_count(1)]) >> +- *Project >> +- *Filter NOT isnotnull(username#14) >> +- *FileScan parquet [username#14] Batched: true, Format: >> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], >> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], >> ReadSchema: struct<username:string> >> >> >> >> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com> >> wrote: >> >>> Bumping this thread. >>> >>> Translating "where not(username is not null)" into a filter of >>> [IsNotNull(username), >>> Not(IsNotNull(username))] seems like a rather severe bug. >>> >>> Spark 1.6.2: >>> >>> explain select count(*) from parquet_table where not( username is not >>> null) >>> >>> == Physical Plan == >>> TungstenAggregate(key=[], >>> functions=[(count(1),mode=Final,isDistinct=false)], >>> output=[_c0#1822L]) >>> +- TungstenExchange SinglePartition, None >>> +- TungstenAggregate(key=[], >>> functions=[(count(1),mode=Partial,isDistinct=false)], >>> output=[count#1825L]) >>> +- Project >>> +- Filter NOT isnotnull(username#1590) >>> +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, >>> PushedFilters: [Not(IsNotNull(username))] >>> >>> Spark 2.0.2 >>> >>> explain select count(*) from parquet_table where not( username is not >>> null) >>> >>> == Physical Plan == >>> *HashAggregate(keys=[], functions=[count(1)]) >>> +- Exchange SinglePartition >>> +- *HashAggregate(keys=[], functions=[partial_count(1)]) >>> +- *Project >>> +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35)) >>> +- *BatchedScan parquet default.<hive table name>[username#35] Format: >>> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [], >>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], >>> ReadSchema: struct<username:string> >>> >>> Example to generate the above: >>> >>> // Create some fake data >>> >>> import org.apache.spark.sql.Row >>> import org.apache.spark.sql.Dataset >>> import org.apache.spark.sql.types._ >>> >>> val rowsRDD = sc.parallelize(Seq( >>> Row(1, "fred"), >>> Row(2, "amy"), >>> Row(3, null))) >>> >>> val schema = StructType(Seq( >>> StructField("id", IntegerType, nullable = true), >>> StructField("username", StringType, nullable = true))) >>> >>> val data = sqlContext.createDataFrame(rowsRDD, schema) >>> >>> val path = "SOME PATH HERE" >>> >>> data.write.mode("overwrite").parquet(path) >>> >>> val testData = sqlContext.read.parquet(path) >>> >>> testData.registerTempTable("filter_test_table") >>> >>> >>> %sql >>> explain select count(*) from filter_test_table where not( username is >>> not null) >>> >>> >>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas < >>> akosti...@nuna.com.invalid> wrote: >>> >>>> Hi, >>>> >>>> I have an application where I’m filtering data with SparkSQL with >>>> simple WHERE clauses. I also want the ability to show the unmatched rows >>>> for any filter, and so am wrapping the previous clause in `NOT()` to get >>>> the inverse. Example: >>>> >>>> Filter: username is not null >>>> Inverse filter: NOT(username is not null) >>>> >>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the >>>> inverse filter always returns zero results. It looks like this is a problem >>>> with how the filter is getting pushed down to Parquet. Specifically, the >>>> pushdown includes both the “is not null” filter, AND “not(is not null)”, >>>> which would obviously result in zero matches. An example below: >>>> >>>> pyspark: >>>> > x = spark.sql('select my_id from my_table where *username is not >>>> null*') >>>> > y = spark.sql('select my_id from my_table where not(*username is not >>>> null*)') >>>> >>>> > x.explain() >>>> == Physical Plan == >>>> *Project [my_id#6L] >>>> +- *Filter isnotnull(username#91) >>>> +- *BatchedScan parquet default.my_table[my_id#6L,username#91] >>>> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet, >>>> PartitionFilters: [], PushedFilters: [IsNotNull(username)], >>>> ReadSchema: struct<my_id:bigint,username:string> >>>> [1159]> y.explain() >>>> == Physical Plan == >>>> *Project [my_id#6L] >>>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam >>>> e >>>> +- *BatchedScan parquet default.my_table[my_id#6L,username#91] >>>> Format: ParquetFormat, InputPaths: s3://my-path/my.parquet, >>>> PartitionFilters: [], >>>> PushedFilters: [IsNotNull(username), >>>> Not(IsNotNull(username))],username >>>> ReadSchema: struct<my_id:bigint,username:string> >>>> >>>> Presently I’m working around this by using the new functionality of NOT >>>> EXISTS in Spark 2, but that seems like overkill. >>>> >>>> Any help appreciated. >>>> >>>> >>>> *Alexi Kostibas*Engineering >>>> *Nuna* >>>> 650 Townsend Street, Suite 425 >>>> San Francisco, CA 94103 >>>> >>>> >>> >> > > > -- > --- > Takeshi Yamamuro >