This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 66e1231 [SPARK-37859][SQL] Do not check for metadata during schema
comparison
66e1231 is described below
commit 66e1231621fc0721a8b1a5e77a8ff84b8e665ea8
Author: Karen Feng <[email protected]>
AuthorDate: Fri Jan 14 10:54:53 2022 +0800
[SPARK-37859][SQL] Do not check for metadata during schema comparison
### What changes were proposed in this pull request?
Ignores the metadata when comparing the user-provided schema and the actual
schema during BaseRelation resolution.
### Why are the changes needed?
Makes it possible to read tables with Spark 3.2 that were written with
Spark 3.1, as
https://github.com/apache/spark/blob/bd24b4884b804fc85a083f82b864823851d5980c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L312
added a new metadata field that broke compatibility.
### Does this PR introduce _any_ user-facing change?
Yes. Previously, an error was thrown when a SQL table written with JDBC in
Spark 3.1 was read in Spark 3.2. Now, no error is thrown.
### How was this patch tested?
Unit test and manual test with a SQL table written with Spark 3.1.
Query:
```
select * from jdbc_table
```
Before:
```
org.apache.spark.sql.AnalysisException: The user-specified schema doesn't
match the actual schema:
```
After: no error
Closes #35158 from karenfeng/SPARK-37859.
Authored-by: Karen Feng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit a1e86373253d77329b2b252c653a69ae8ac0bd6c)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/datasources/DataSource.scala | 2 +-
.../apache/spark/sql/sources/TableScanSuite.scala | 2 +-
.../sql/test/DataFrameReaderWriterSuite.scala | 25 +++++++++++++++++++---
3 files changed, 24 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ad850cf..1f6269c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -353,7 +353,7 @@ case class DataSource(
case (dataSource: RelationProvider, Some(schema)) =>
val baseRelation =
dataSource.createRelation(sparkSession.sqlContext,
caseInsensitiveOptions)
- if (baseRelation.schema != schema) {
+ if (!DataType.equalsIgnoreCompatibleNullability(baseRelation.schema,
schema)) {
throw
QueryCompilationErrors.userSpecifiedSchemaMismatchActualSchemaError(
schema, baseRelation.schema)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index de54b38..47bacde 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -359,7 +359,7 @@ class TableScanSuite extends DataSourceTest with
SharedSparkSession {
val schemaNotMatch = intercept[Exception] {
sql(
s"""
- |CREATE $tableType relationProviderWithSchema (i int)
+ |CREATE $tableType relationProviderWithSchema (i string)
|USING org.apache.spark.sql.sources.SimpleScanSource
|OPTIONS (
| From '1',
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 41d1156..ea007c1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -536,12 +536,31 @@ class DataFrameReaderWriterSuite extends QueryTest with
SharedSparkSession with
.option("TO", "10")
.format("org.apache.spark.sql.sources.SimpleScanSource")
+ val answerDf = spark.range(1, 11).toDF()
+
// when users do not specify the schema
- checkAnswer(dfReader.load(), spark.range(1, 11).toDF())
+ checkAnswer(dfReader.load(), answerDf)
+
+ // same base schema, differing metadata and nullability
+ val fooBarMetadata = new MetadataBuilder().putString("foo", "bar").build()
+ val nullableAndMetadataCases = Seq(
+ (false, fooBarMetadata),
+ (false, Metadata.empty),
+ (true, fooBarMetadata),
+ (true, Metadata.empty))
+ nullableAndMetadataCases.foreach { case (nullable, metadata) =>
+ val inputSchema = new StructType()
+ .add("i", IntegerType, nullable = nullable, metadata = metadata)
+ checkAnswer(dfReader.schema(inputSchema).load(), answerDf)
+ }
// when users specify a wrong schema
- val inputSchema = new StructType().add("s", IntegerType, nullable = false)
- val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load()
}
+ var inputSchema = new StructType().add("s", IntegerType, nullable = false)
+ var e = intercept[AnalysisException] { dfReader.schema(inputSchema).load()
}
+ assert(e.getMessage.contains("The user-specified schema doesn't match the
actual schema"))
+
+ inputSchema = new StructType().add("i", StringType, nullable = true)
+ e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() }
assert(e.getMessage.contains("The user-specified schema doesn't match the
actual schema"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]