[ 
https://issues.apache.org/jira/browse/SPARK-25368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lev Katzav updated SPARK-25368:
-------------------------------
    Description: 
there is a breaking change in spark 2.3 (I checked on 2.3.1 and 2.3.2-rc5)

the following code recreates the problem
 (it's a bit convoluted examples, I tried to simplify it as much as possible 
from my code)
{code:java}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import spark.implicits._

case class Data(a: Option[Int],b: String,c: Option[String],d: String)

val df1 = spark.createDataFrame(Seq(
   Data(Some(1), "1", None, "1"),
   Data(None, "2", Some("2"), "2")
))

val df2 = df1
.where( $"a".isNotNull)
.withColumn("e", lit(null).cast("string"))

val columns = df2.columns.map(c => col(c))

val df3 = df1
.select(
  $"c",
  $"b" as "e"
  )
  .withColumn("a", lit(null).cast("int"))
  .withColumn("b", lit(null).cast("string"))
  .withColumn("d", lit(null).cast("string"))
  .select(columns :_*)

val df4 =
  df2.union(df3)
  .withColumn("e", last(col("e"), ignoreNulls = 
true).over(Window.partitionBy($"c").orderBy($"d")))
  .filter($"a".isNotNull)

df4.show

{code}
 

notice that the last statement in for df4 is to filter rows where a is null

in spark 2.2.1, the above code prints:
{code:java}
+---+---+----+---+---+ 
| a| b| c| d| e|
 +---+---+----+---+---+ 
| 1| 1|null| 1| 1| 
+---+---+----+---+---+
{code}
in spark 2.3.x, it prints: 
{code:java}
+----+----+----+----+---+ 
| a| b| c| d| e| 
+----+----+----+----+---+ 
|null|null|null|null| 1| 
| 1| 1|null| 1| 1| 
|null|null| 2|null| 2|
 +----+----+----+----+---+
{code}
 the column a still contains null values

 

attached are the plans.

int the parsed logical plan, the filter for isnotnull('a), is on top,
but in the optimized logical plan, it is pushed down

  was:
there is a breaking change in spark 2.3 (I checked on 2.3.1 and 2.3.2-rc5)

the following code recreates the problem
 (it's a bit convoluted examples, I tried to simplify it as much as possible 
from my code)
{code:java}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import spark.implicits._

case class Data(a: Option[Int],b: String,c: Option[String],d: String)

val df1 = spark.createDataFrame(Seq(
   Data(Some(1), "1", None, "1"),
   Data(None, "2", Some("2"), "2")
))

val df2 = df1
.where( $"a".isNotNull)
.withColumn("e", lit(null).cast("string"))

val columns = df2.columns.map(c => col(c))

val df3 = df1
.select(
  $"c",
  $"b" as "e"
  )
  .withColumn("a", lit(null).cast("int"))
  .withColumn("b", lit(null).cast("string"))
  .withColumn("d", lit(null).cast("string"))
  .select(columns :_*)

val df4 =
  df2.union(df3)
  .withColumn("e", last(col("e"), ignoreNulls = 
true).over(Window.partitionBy($"c").orderBy($"d")))
  .filter($"a".isNotNull)

df4.show

{code}
 

notice that the last statement in for df4 is to filter rows where a is null

in spark 2.2.1, the above code prints:
{code:java}
+---+---+----+---+---+ 
| a| b| c| d| e|
 +---+---+----+---+---+ 
| 1| 1|null| 1| 1| 
+---+---+----+---+---+
{code}
in spark 2.3.x, it prints: 
{code:java}
+----+----+----+----+---+ 
| a| b| c| d| e| 
+----+----+----+----+---+ 
|null|null|null|null| 1| 
| 1| 1|null| 1| 1| 
|null|null| 2|null| 2|
 +----+----+----+----+---+
{code}
 the column a still contains null values


> Incorrect predicate pushdown returns in incorrect result
> --------------------------------------------------------
>
>                 Key: SPARK-25368
>                 URL: https://issues.apache.org/jira/browse/SPARK-25368
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 2.3.1, 2.3.2
>            Reporter: Lev Katzav
>            Priority: Blocker
>         Attachments: plan.txt
>
>
> there is a breaking change in spark 2.3 (I checked on 2.3.1 and 2.3.2-rc5)
> the following code recreates the problem
>  (it's a bit convoluted examples, I tried to simplify it as much as possible 
> from my code)
> {code:java}
> import org.apache.spark.sql.{DataFrame, SQLContext}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
> import spark.implicits._
> case class Data(a: Option[Int],b: String,c: Option[String],d: String)
> val df1 = spark.createDataFrame(Seq(
>    Data(Some(1), "1", None, "1"),
>    Data(None, "2", Some("2"), "2")
> ))
> val df2 = df1
> .where( $"a".isNotNull)
> .withColumn("e", lit(null).cast("string"))
> val columns = df2.columns.map(c => col(c))
> val df3 = df1
> .select(
>   $"c",
>   $"b" as "e"
>   )
>   .withColumn("a", lit(null).cast("int"))
>   .withColumn("b", lit(null).cast("string"))
>   .withColumn("d", lit(null).cast("string"))
>   .select(columns :_*)
> val df4 =
>   df2.union(df3)
>   .withColumn("e", last(col("e"), ignoreNulls = 
> true).over(Window.partitionBy($"c").orderBy($"d")))
>   .filter($"a".isNotNull)
> df4.show
> {code}
>  
> notice that the last statement in for df4 is to filter rows where a is null
> in spark 2.2.1, the above code prints:
> {code:java}
> +---+---+----+---+---+ 
> | a| b| c| d| e|
>  +---+---+----+---+---+ 
> | 1| 1|null| 1| 1| 
> +---+---+----+---+---+
> {code}
> in spark 2.3.x, it prints: 
> {code:java}
> +----+----+----+----+---+ 
> | a| b| c| d| e| 
> +----+----+----+----+---+ 
> |null|null|null|null| 1| 
> | 1| 1|null| 1| 1| 
> |null|null| 2|null| 2|
>  +----+----+----+----+---+
> {code}
>  the column a still contains null values
>  
> attached are the plans.
> int the parsed logical plan, the filter for isnotnull('a), is on top,
> but in the optimized logical plan, it is pushed down



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to