Nan Zhu created SPARK-44517:
-------------------------------
Summary: first operator should respect the nullability of child
expression as well as ignoreNulls option
Key: SPARK-44517
URL: https://issues.apache.org/jira/browse/SPARK-44517
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.4.1, 3.4.0, 3.3.2, 3.2.4, 3.2.3, 3.3.1, 3.2.2, 3.3.0,
3.2.1, 3.2.0
Reporter: Nan Zhu
I found the following problem when using Spark recently:
{code:java}
// code placeholder
import spark.implicits._
val s = Seq((1.2, "s", 2.2)).toDF("v1", "v2", "v3")
val schema = StructType(Seq(StructField("v1", DoubleType, nullable =
false),StructField("v2", StringType, nullable = true),StructField("v3",
DoubleType, nullable = false)))
val df = spark.createDataFrame(s.rdd, schema)val inputDF =
val inputDF = df.dropDuplicates("v3")
spark.sql("CREATE TABLE local.db.table (\n v1 DOUBLE NOT NULL,\n v2 STRING, v3
DOUBLE NOT NULL)")
inputDF.write.mode("overwrite").format("iceberg").save("local.db.table") {code}
when I use the above code to write to iceberg (i guess Delta Lake will have the
same problem) , I got very confusing exception
{code:java}
Exception in thread "main" java.lang.IllegalArgumentException: Cannot write
incompatible dataset to table with schema:
table
{ 1: v1: required double 2: v2: optional string 3: v3: required double}
Provided schema:
table { 1: v1: optional double 2: v2: optional string 3: v3: required
double} {code}
basically it complains that we have v1 as the nullable column in our `inputDF`
above which is not allowed since we created table with the v1 as not nullable.
The confusion comes from that, if we check the schema with printSchema() of
inputDF, v1 is not nullable
{noformat}
root
|-- v1: double (nullable = false)
|-- v2: string (nullable = true)
|-- v3: double (nullable = false){noformat}
Clearly, something changed the v1's nullability unexpectedly!
After some debugging I found that the key is that dropDuplicates("v3"). In
optimization phase, we have ReplaceDeduplicateWithAggregate to replace the
Deduplicate with aggregate on v3 and run first() over all other columns.
However, first() operator has hard coded nullable as always "true" which is the
source of changed nullability of v1
this is a very confusing behavior of Spark, and probably no one really noticed
as we do not care too much without the new table formats like delta lake and
iceberg which can make nullability check correctly. Nowadays, we users adopt
them more and more, this is surfaced up
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]