[
https://issues.apache.org/jira/browse/SPARK-15632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cheng Lian updated SPARK-15632:
-------------------------------
Description:
h1. Overview
Filter operations should never change query plan schema. However, Dataset typed
filter operation does introduce schema change in some cases. Furthermore, all
the following aspects of the schema may be changed:
# field order,
# field number,
# field data type,
# field name, and
# field nullability
This is mostly because we wrap the actual {{Filter}} operator with a
{{SerializeFromObject}}/{{DeserializeToObject}} pair (query plan fragment
illustrated as following), which performs a bunch of magic tricks.
{noformat}
SerializeFromObject
Filter
DeserializeToObject
<child-plan>
{noformat}
h1. Reproduction
h2. Field order, field number, and field data type change
{code}
case class A(b: Double, a: String)
val data = Seq(
"{ 'a': 'foo', 'b': 1, 'c': 'extra' }",
"{ 'a': 'bar', 'b': 2, 'c': 'extra' }",
"{ 'a': 'bar', 'c': 'extra' }"
)
val df1 = spark.read.json(sc.parallelize(data))
df1.printSchema()
// root
// |-- a: string (nullable = true)
// |-- b: long (nullable = true)
// |-- c: string (nullable = true)
val ds1 = df1.as[A]
ds1.printSchema()
// root
// |-- a: string (nullable = true)
// |-- b: long (nullable = true)
// |-- c: string (nullable = true)
val ds2 = ds1.filter(_.b > 1) // <- Here comes the trouble maker
ds2.printSchema()
// root <- 1. Reordered `a` and `b`, and
// |-- b: double (nullable = true) 2. dropped `c`, and
// |-- a: string (nullable = true) 3. up-casted `b` from long to double
val df2 = ds2.toDF()
df2.printSchema()
// root <- (Same as above)
// |-- b: double (nullable = true)
// |-- a: string (nullable = true)
{code}
h3. Field order change
{{DeserializeToObject}} resolves the encoder deserializer expression by *name*.
Thus field order in input query plan doesn't matter.
h3. Field number change
Same as above, fields not referred by the encoder are silently dropped while
resolving deserializer expressions by name.
h3. Field data type change
When generating deserializer expressions, we allows "sane" implicit coercions
(e.g. integer to long, and long to double) by inserting {{UpCast}} operators.
Thus actual field data types in input query plan don't matter either as long as
there are valid implicit coercions.
h2. Field name and nullability change
{code}
val ds3 = spark.range(10)
ds3.printSchema()
// root
// |-- id: long (nullable = false)
val ds4 = ds3.filter(_ > 3)
ds4.printSchema()
// root
// |-- value: long (nullable = true) 4. Name changed from `id` to `value`, and
// 5. nullability changed from false to true
{code}
h3. Field name change
Primitive encoders like {{Encoder\[Long\]}} doesn't have a named field, thus
they always has only a single field with hard-coded name "value". On the other
hand, when serializing domain objects back to rows, schema of
{{SerializeFromObject}} is solely determined by the encoder. Thus the original
name {{id}} becomes {{value}}.
h3. Nullability change
[PR #11880|https://github.com/apache/spark/pull/11880] updated return type of
{{SparkSession.range}} from {{Dataset\[Long\]}} to
{{Dataset\[java.lang.Long\]}} due to
[SI-4388|https://issues.scala-lang.org/browse/SI-4388]. As a consequence,
although the underlying {{Range}} operator produces non-nullable output, the
result encoder is nullable since {{java.lang.Long}} is nullable. Thus, we
observe nullablity change after typed filtering because serializer expression
is derived from encoder rather than the query plan.
was:
Filter operations should never change query plan schema. However, Dataset typed
filter operation does introduce schema change:
{code}
case class A(b: Double, a: String)
val data = Seq(
"{ 'a': 'foo', 'b': 1, 'c': 'extra' }",
"{ 'a': 'bar', 'b': 2, 'c': 'extra' }",
"{ 'a': 'bar', 'c': 'extra' }"
)
val df1 = spark.read.json(sc.parallelize(data))
df1.printSchema()
// root
// |-- a: string (nullable = true)
// |-- b: long (nullable = true)
// |-- c: string (nullable = true)
val ds1 = df1.as[A]
ds1.printSchema()
// root
// |-- a: string (nullable = true)
// |-- b: long (nullable = true)
// |-- c: string (nullable = true)
val ds2 = ds1.filter(_.b > 1) // <- Here comes the trouble maker
ds2.printSchema()
// root <- 1. reordered `a` and `b`, and
// |-- b: double (nullable = true) 2. dropped `c`, and
// |-- a: string (nullable = true) 3. up-casted `b` from long to double
val df2 = ds2.toDF()
df2.printSchema()
// root <- (Same as above)
// |-- b: double (nullable = true)
// |-- a: string (nullable = true)
{code}
This is becase we wraps the actual {{Filter}} operator with a
{{SerializeFromObject}}/{{DeserializeToObject}} pair.
{{DeserializeToObject}} does a bunch of magic tricks here:
# Field order change
#- {{DeserializeToObject}} resolves the encoder deserializer expression by
**name**. Thus field order in input query plan doesn't matter.
# Field number change
#- Same as above, fields not referred by the encoder are silently dropped while
resolving deserializer expressions by name.
# Field data type change
#- When generating deserializer expressions, we allows "sane" implicit
coercions (e.g. integer to long, and long to double) by inserting {{UpCast}}
operators. Thus actual field data types in input query plan don't matter either
as long as there are valid implicit coercions.
Actually, even field names may change once [PR
#13269|https://github.com/apache/spark/pull/13269] gets merged, because it
introduces case-insensitive encoder resolution.
> Dataset typed filter operation changes query plan schema
> --------------------------------------------------------
>
> Key: SPARK-15632
> URL: https://issues.apache.org/jira/browse/SPARK-15632
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Affects Versions: 2.0.0
> Reporter: Cheng Lian
>
> h1. Overview
> Filter operations should never change query plan schema. However, Dataset
> typed filter operation does introduce schema change in some cases.
> Furthermore, all the following aspects of the schema may be changed:
> # field order,
> # field number,
> # field data type,
> # field name, and
> # field nullability
> This is mostly because we wrap the actual {{Filter}} operator with a
> {{SerializeFromObject}}/{{DeserializeToObject}} pair (query plan fragment
> illustrated as following), which performs a bunch of magic tricks.
> {noformat}
> SerializeFromObject
> Filter
> DeserializeToObject
> <child-plan>
> {noformat}
> h1. Reproduction
> h2. Field order, field number, and field data type change
> {code}
> case class A(b: Double, a: String)
> val data = Seq(
> "{ 'a': 'foo', 'b': 1, 'c': 'extra' }",
> "{ 'a': 'bar', 'b': 2, 'c': 'extra' }",
> "{ 'a': 'bar', 'c': 'extra' }"
> )
> val df1 = spark.read.json(sc.parallelize(data))
> df1.printSchema()
> // root
> // |-- a: string (nullable = true)
> // |-- b: long (nullable = true)
> // |-- c: string (nullable = true)
> val ds1 = df1.as[A]
> ds1.printSchema()
> // root
> // |-- a: string (nullable = true)
> // |-- b: long (nullable = true)
> // |-- c: string (nullable = true)
> val ds2 = ds1.filter(_.b > 1) // <- Here comes the trouble maker
> ds2.printSchema()
> // root <- 1. Reordered `a` and `b`, and
> // |-- b: double (nullable = true) 2. dropped `c`, and
> // |-- a: string (nullable = true) 3. up-casted `b` from long to double
> val df2 = ds2.toDF()
> df2.printSchema()
> // root <- (Same as above)
> // |-- b: double (nullable = true)
> // |-- a: string (nullable = true)
> {code}
> h3. Field order change
> {{DeserializeToObject}} resolves the encoder deserializer expression by
> *name*. Thus field order in input query plan doesn't matter.
> h3. Field number change
> Same as above, fields not referred by the encoder are silently dropped while
> resolving deserializer expressions by name.
> h3. Field data type change
> When generating deserializer expressions, we allows "sane" implicit coercions
> (e.g. integer to long, and long to double) by inserting {{UpCast}} operators.
> Thus actual field data types in input query plan don't matter either as long
> as there are valid implicit coercions.
> h2. Field name and nullability change
> {code}
> val ds3 = spark.range(10)
> ds3.printSchema()
> // root
> // |-- id: long (nullable = false)
> val ds4 = ds3.filter(_ > 3)
> ds4.printSchema()
> // root
> // |-- value: long (nullable = true) 4. Name changed from `id` to `value`,
> and
> // 5. nullability changed from false to
> true
> {code}
> h3. Field name change
> Primitive encoders like {{Encoder\[Long\]}} doesn't have a named field, thus
> they always has only a single field with hard-coded name "value". On the
> other hand, when serializing domain objects back to rows, schema of
> {{SerializeFromObject}} is solely determined by the encoder. Thus the
> original name {{id}} becomes {{value}}.
> h3. Nullability change
> [PR #11880|https://github.com/apache/spark/pull/11880] updated return type of
> {{SparkSession.range}} from {{Dataset\[Long\]}} to
> {{Dataset\[java.lang.Long\]}} due to
> [SI-4388|https://issues.scala-lang.org/browse/SI-4388]. As a consequence,
> although the underlying {{Range}} operator produces non-nullable output, the
> result encoder is nullable since {{java.lang.Long}} is nullable. Thus, we
> observe nullablity change after typed filtering because serializer expression
> is derived from encoder rather than the query plan.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]