[
https://issues.apache.org/jira/browse/SPARK-19586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Everett Anderson updated SPARK-19586:
-------------------------------------
Description:
Opening this as it's a somewhat serious issue in the 2.0.x tree in case there's
a 2.0.3 planned, but it is fixed in 2.1.0.
While it works in 1.6.2 and 2.1.0, it appears 2.0.2 has a significant filter
optimization error.
Example:
{noformat}
// Create some fake data
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._
val rowsRDD = sc.parallelize(Seq(
Row(1, "fred"),
Row(2, "amy"),
Row(3, null)))
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("username", StringType, nullable = true)))
val data = sqlContext.createDataFrame(rowsRDD, schema)
val path = "/tmp/test_data"
data.write.mode("overwrite").parquet(path)
val testData = sqlContext.read.parquet(path)
testData.registerTempTable("filter_test_table")
{noformat}
{noformat}
%sql
explain select count(*) from filter_test_table where not( username is not null)
{noformat}
or
{noformat}
spark.sql("select count(*) from filter_test_table where not( username is not
null)").explain
{noformat}
In 2.0.2, I'm seeing
{noformat}
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)])
+- *Project
+- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
+- *BatchedScan parquet default.<hive table name>[username#35] Format:
ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema:
struct<username:string>
{noformat}
which seems like both an impossible Filter and an impossible pushed filter.
In Spark 1.6.2 it was
{noformat}
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
+- Project
+- Filter NOT isnotnull(username#1590)
+- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
PushedFilters: [Not(IsNotNull(username))]
{noformat}
and 2.1.0 it's working again as
{noformat}
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)])
+- *Project
+- *Filter NOT isnotnull(username#14)
+- *FileScan parquet [username#14] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [],
PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct<username:string>
{noformat}
while it's easy for humans in interactive cases to work around this by removing
the double negative, it's a bit harder if it's a programmatic construct.
was:
Opening this as it's a somewhat serious issue in the 2.0.x tree in case there's
a 2.0.3 planned, but it is fixed in 2.1.0.
While it works in 1.6.2 and 2.1.0, it appears 2.0.2 has a significant filter
optimization error.
Example:
// Create some fake data
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._
val rowsRDD = sc.parallelize(Seq(
Row(1, "fred"),
Row(2, "amy"),
Row(3, null)))
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("username", StringType, nullable = true)))
val data = sqlContext.createDataFrame(rowsRDD, schema)
val path = "/tmp/test_data"
data.write.mode("overwrite").parquet(path)
val testData = sqlContext.read.parquet(path)
testData.registerTempTable("filter_test_table")
%sql
explain select count(*) from filter_test_table where not( username is not null)
or
spark.sql("select count(*) from filter_test_table where not( username is not
null)").explain
In 2.0.2, I'm seeing
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)])
+- *Project
+- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
+- *BatchedScan parquet default.<hive table name>[username#35] Format:
ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema:
struct<username:string>
which seems like both an impossible Filter and an impossible pushed filter.
In Spark 1.6.2 it was
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
+- Project
+- Filter NOT isnotnull(username#1590)
+- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
PushedFilters: [Not(IsNotNull(username))]
and 2.1.0 it's working again as
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)])
+- *Project
+- *Filter NOT isnotnull(username#14)
+- *FileScan parquet [username#14] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [],
PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct<username:string>
while it's easy for humans in interactive cases to work around this by removing
the double negative, it's a bit harder if it's a programmatic construct.
> Incorrect push down filter for double negative in SQL
> -----------------------------------------------------
>
> Key: SPARK-19586
> URL: https://issues.apache.org/jira/browse/SPARK-19586
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.2
> Reporter: Everett Anderson
> Fix For: 2.1.0
>
>
> Opening this as it's a somewhat serious issue in the 2.0.x tree in case
> there's a 2.0.3 planned, but it is fixed in 2.1.0.
> While it works in 1.6.2 and 2.1.0, it appears 2.0.2 has a significant filter
> optimization error.
> Example:
> {noformat}
> // Create some fake data
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
> val rowsRDD = sc.parallelize(Seq(
> Row(1, "fred"),
> Row(2, "amy"),
> Row(3, null)))
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = true),
> StructField("username", StringType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
> val path = "/tmp/test_data"
> data.write.mode("overwrite").parquet(path)
> val testData = sqlContext.read.parquet(path)
> testData.registerTempTable("filter_test_table")
> {noformat}
> {noformat}
> %sql
> explain select count(*) from filter_test_table where not( username is not
> null)
> {noformat}
> or
> {noformat}
> spark.sql("select count(*) from filter_test_table where not( username is not
> null)").explain
> {noformat}
> In 2.0.2, I'm seeing
> {noformat}
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
> +- *HashAggregate(keys=[], functions=[partial_count(1)])
> +- *Project
> +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
> +- *BatchedScan parquet default.<hive table name>[username#35] Format:
> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema:
> struct<username:string>
> {noformat}
> which seems like both an impossible Filter and an impossible pushed filter.
> In Spark 1.6.2 it was
> {noformat}
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[_c0#1822L])
> +- TungstenExchange SinglePartition, None
> +- TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
> +- Project
> +- Filter NOT isnotnull(username#1590)
> +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
> PushedFilters: [Not(IsNotNull(username))]
> {noformat}
> and 2.1.0 it's working again as
> {noformat}
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
> +- *HashAggregate(keys=[], functions=[partial_count(1)])
> +- *Project
> +- *Filter NOT isnotnull(username#14)
> +- *FileScan parquet [username#14] Batched: true, Format:
> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters:
> [], PushedFilters: [Not(IsNotNull(username))], ReadSchema:
> struct<username:string>
> {noformat}
> while it's easy for humans in interactive cases to work around this by
> removing the double negative, it's a bit harder if it's a programmatic
> construct.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]