This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f8c87f03297 [SPARK-43380][SQL] Fix Avro data type conversion issues
without causing performance regression
f8c87f03297 is described below
commit f8c87f03297e2770e2944e8e8fe097b75f9e8fea
Author: zeruibao <[email protected]>
AuthorDate: Wed Sep 27 16:42:35 2023 +0800
[SPARK-43380][SQL] Fix Avro data type conversion issues without causing
performance regression
### What changes were proposed in this pull request?
My last PR https://github.com/apache/spark/pull/41052 causes AVRO read
performance regression since I change the code structure. I turn one match case
into a nested match case. So I fix the Avro data type conversion issues in
anther way to avoid this regression.
Original Change:
We introduce the SQLConf
`spark.sql.legacy.avro.allowReadingWithIncompatibleSchema` to prevent reading
interval types as date or timestamp types to avoid getting corrupt dates as
well as reading decimal types with incorrect precision.
### Why are the changes needed?
We found the following issues with open source Avro:
- Interval types can be read as date or timestamp types that would lead to
wildly different results
For example, `Duration.ofDays(1).plusSeconds(1)` will be read as
`1972-09-27`, which is weird.
- Decimal types can be read with lower precision, that leads to data being
read as `null` instead of suggesting that a wider decimal format should be
provided
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Old unit test
Closes #42503 from zeruibao/SPARK-4380-real-fix-regression.
Lead-authored-by: zeruibao <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-classes.json | 5 +
.../apache/spark/sql/avro/AvroDeserializer.scala | 46 ++++--
.../org/apache/spark/sql/avro/AvroSuite.scala | 158 +++++++++++++++++++++
docs/sql-error-conditions.md | 8 +-
docs/sql-migration-guide.md | 1 +
.../spark/sql/errors/QueryCompilationErrors.scala | 16 +++
.../org/apache/spark/sql/internal/SQLConf.scala | 12 ++
7 files changed, 235 insertions(+), 11 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 5d827c67482..dd0190c3462 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -75,6 +75,11 @@
}
}
},
+ "AVRO_INCOMPATIBLE_READ_TYPE" : {
+ "message" : [
+ "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original
encoded data type is <avroType>, however you're trying to read the field as
<sqlType>, which would lead to an incorrect answer. To allow reading this
field, enable the SQL configuration:
\"spark.sql.legacy.avro.allowIncompatibleSchema\"."
+ ]
+ },
"BATCH_METADATA_NOT_FOUND" : {
"message" : [
"Unable to find batch <batchMetadataFile>."
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index a78ee89a3e9..e82116eec1e 100644
---
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -35,8 +35,9 @@ import
org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData,
DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.DataSourceUtils
-import org.apache.spark.sql.internal.LegacyBehaviorPolicy
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -117,6 +118,10 @@ private[sql] class AvroDeserializer(
val incompatibleMsg = errorPrefix +
s"schema is incompatible (avroType = $avroType, sqlType =
${catalystType.sql})"
+ val realDataType = SchemaConverters.toSqlType(avroType).dataType
+ val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA
+ val preventReadingIncorrectType = !SQLConf.get.getConf(confKey)
+
(avroType.getType, catalystType) match {
case (NULL, NullType) => (updater, ordinal, _) =>
updater.setNullAt(ordinal)
@@ -128,9 +133,19 @@ private[sql] class AvroDeserializer(
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
+ case (INT, dt: DatetimeType)
+ if preventReadingIncorrectType &&
realDataType.isInstanceOf[YearMonthIntervalType] =>
+ throw
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
+ toFieldStr(catalystPath), realDataType.catalogString,
dt.catalogString)
+
case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+ case (LONG, dt: DatetimeType)
+ if preventReadingIncorrectType &&
realDataType.isInstanceOf[DayTimeIntervalType] =>
+ throw
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
+ toFieldStr(catalystPath), realDataType.catalogString,
dt.catalogString)
+
case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
@@ -204,17 +219,30 @@ private[sql] class AvroDeserializer(
}
updater.set(ordinal, bytes)
- case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
+ case (FIXED, dt: DecimalType) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
- val bigDecimal =
decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
- val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
- updater.setDecimal(ordinal, decimal)
+ if (preventReadingIncorrectType &&
+ d.getPrecision - d.getScale > dt.precision - dt.scale) {
+ throw
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
+ toFieldStr(catalystPath), realDataType.catalogString,
dt.catalogString)
+ }
+ (updater, ordinal, value) =>
+ val bigDecimal =
+ decimalConversions.fromFixed(value.asInstanceOf[GenericFixed],
avroType, d)
+ val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+ updater.setDecimal(ordinal, decimal)
- case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
+ case (BYTES, dt: DecimalType) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
- val bigDecimal =
decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
- val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
- updater.setDecimal(ordinal, decimal)
+ if (preventReadingIncorrectType &&
+ d.getPrecision - d.getScale > dt.precision - dt.scale) {
+ throw
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
+ toFieldStr(catalystPath), realDataType.catalogString,
dt.catalogString)
+ }
+ (updater, ordinal, value) =>
+ val bigDecimal =
decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
+ val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
+ updater.setDecimal(ordinal, decimal)
case (RECORD, st: StructType) =>
// Avro datasource doesn't accept filters with nested attributes. See
SPARK-32328.
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d22a2d36975..ffb0a49641b 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -32,6 +32,7 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericData, GenericDatumReader,
GenericDatumWriter, GenericRecord}
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException,
SparkUpgradeException}
import org.apache.spark.TestUtils.assertExceptionMsg
@@ -814,6 +815,163 @@ abstract class AvroSuite
}
}
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of decimal type to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+ sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
+ // With the flag disabled, we will throw an exception if there is a
mismatch
+ withSQLConf(confKey -> "false") {
+ val e = intercept[SparkException] {
+ spark.read.schema("a DECIMAL(4,
3)").format("avro").load(path.toString).collect()
+ }
+ ExceptionUtils.getRootCause(e) match {
+ case ex: AnalysisException =>
+ checkError(
+ exception = ex,
+ errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
+ parameters = Map("avroPath" -> "field 'a'",
+ "sqlPath" -> "field 'a'",
+ "avroType" -> "decimal\\(12,10\\)",
+ "sqlType" -> "\"DECIMAL\\(4,3\\)\""),
+ matchPVals = true
+ )
+ case other =>
+ fail(s"Received unexpected exception", other)
+ }
+ }
+ // The following used to work, so it should still work with the flag
enabled
+ checkAnswer(
+ spark.read.schema("a DECIMAL(5,
3)").format("avro").load(path.toString),
+ Row(new java.math.BigDecimal("13.123"))
+ )
+ withSQLConf(confKey -> "true") {
+ // With the flag enabled, we return a null silently, which isn't great
+ checkAnswer(
+ spark.read.schema("a DECIMAL(4,
3)").format("avro").load(path.toString),
+ Row(null)
+ )
+ checkAnswer(
+ spark.read.schema("a DECIMAL(5,
3)").format("avro").load(path.toString),
+ Row(new java.math.BigDecimal("13.123"))
+ )
+ }
+ }
+ }
+
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of DayTimeIntervalType to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+ val schema = StructType(Array(StructField("a", DayTimeIntervalType(),
false)))
+ val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1)))
+
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.format("avro").save(path.getCanonicalPath)
+
+ withSQLConf(confKey -> "false") {
+ Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
+ val e = intercept[SparkException] {
+ spark.read.schema(s"a
$sqlType").format("avro").load(path.toString).collect()
+ }
+
+ ExceptionUtils.getRootCause(e) match {
+ case ex: AnalysisException =>
+ checkError(
+ exception = ex,
+ errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
+ parameters = Map("avroPath" -> "field 'a'",
+ "sqlPath" -> "field 'a'",
+ "avroType" -> "interval day to second",
+ "sqlType" -> s""""$sqlType""""),
+ matchPVals = true
+ )
+ case other =>
+ fail(s"Received unexpected exception", other)
+ }
+ }
+ }
+
+ withSQLConf(confKey -> "true") {
+ // Allow conversion and do not need to check result
+ spark.read.schema("a Date").format("avro").load(path.toString)
+ spark.read.schema("a timestamp").format("avro").load(path.toString)
+ spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
+ }
+ }
+ }
+
+ test("SPARK-43380: Fix Avro data type conversion" +
+ " of YearMonthIntervalType to avoid producing incorrect results") {
+ withTempPath { path =>
+ val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
+ val schema = StructType(Array(StructField("a", YearMonthIntervalType(),
false)))
+ val data = Seq(Row(java.time.Period.of(1, 1, 0)))
+
+ val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+ df.write.format("avro").save(path.getCanonicalPath)
+
+ withSQLConf(confKey -> "false") {
+ Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
+ val e = intercept[SparkException] {
+ spark.read.schema(s"a
$sqlType").format("avro").load(path.toString).collect()
+ }
+
+ ExceptionUtils.getRootCause(e) match {
+ case ex: AnalysisException =>
+ checkError(
+ exception = ex,
+ errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
+ parameters = Map("avroPath" -> "field 'a'",
+ "sqlPath" -> "field 'a'",
+ "avroType" -> "interval year to month",
+ "sqlType" -> s""""$sqlType""""),
+ matchPVals = true
+ )
+ case other =>
+ fail(s"Received unexpected exception", other)
+ }
+ }
+ }
+
+ withSQLConf(confKey -> "true") {
+ // Allow conversion and do not need to check result
+ spark.read.schema("a Date").format("avro").load(path.toString)
+ spark.read.schema("a timestamp").format("avro").load(path.toString)
+ spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
+ }
+ }
+ }
+
+ Seq(
+ "time-millis",
+ "time-micros",
+ "timestamp-micros",
+ "timestamp-millis",
+ "local-timestamp-millis",
+ "local-timestamp-micros"
+ ).foreach { timeLogicalType =>
+ test(s"converting $timeLogicalType type to long in avro") {
+ withTempPath { path =>
+ val df = Seq(100L)
+ .toDF("dt")
+ val avroSchema =
+ s"""
+ |{
+ | "type" : "record",
+ | "name" : "test_schema",
+ | "fields" : [
+ | {"name": "dt", "type": {"type": "long", "logicalType":
"$timeLogicalType"}}
+ | ]
+ |}""".stripMargin
+ df.write.format("avro").option("avroSchema",
avroSchema).save(path.getCanonicalPath)
+ checkAnswer(
+ spark.read.schema(s"dt long").format("avro").load(path.toString),
+ Row(100L))
+ }
+ }
+ }
+
test("converting some specific sparkSQL types to avro") {
withTempPath { tempDir =>
val testSchema = StructType(Seq(
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 38cfc28ba09..660a72dca7d 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -93,6 +93,12 @@ Invalid as-of join.
For more details see
[AS_OF_JOIN](sql-error-conditions-as-of-join-error-class.html)
+### AVRO_INCOMPATIBLE_READ_TYPE
+
+SQLSTATE: none assigned
+
+Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original
encoded data type is `<avroType>`, however you're trying to read the field as
`<sqlType>`, which would lead to an incorrect answer. To allow reading this
field, enable the SQL configuration:
"spark.sql.legacy.avro.allowIncompatibleSchema".
+
### BATCH_METADATA_NOT_FOUND
[SQLSTATE:
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
@@ -2210,5 +2216,3 @@ The operation `<operation>` requires a `<requiredType>`.
But `<objectName>` is a
The `<functionName>` requires `<expectedNum>` parameters but the actual number
is `<actualNum>`.
For more details see
[WRONG_NUM_ARGS](sql-error-conditions-wrong-num-args-error-class.html)
-
-
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index a28f6fd284d..c5d09c19b24 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -36,6 +36,7 @@ license: |
- Since Spark 3.5, the `plan` field is moved from `AnalysisException` to
`EnhancedAnalysisException`.
- Since Spark 3.5, `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning`
is enabled by default. To restore the previous behavior, set
`spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` to `false`.
- Since Spark 3.5, the `array_insert` function is 1-based for negative
indexes. It inserts new element at the end of input arrays for the index -1. To
restore the previous behavior, set
`spark.sql.legacy.negativeIndexInArrayInsert` to `true`.
+- Since Spark 3.5, the Avro will throw `AnalysisException` when reading
Interval types as Date or Timestamp types, or reading Decimal types with lower
precision. To restore the legacy behavior, set
`spark.sql.legacy.avro.allowIncompatibleSchema` to `true`
## Upgrading from Spark SQL 3.3 to 3.4
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 9d2b1225825..2f2341ab47f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3727,6 +3727,22 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
)
}
+ def avroIncompatibleReadError(
+ avroPath: String,
+ sqlPath: String,
+ avroType: String,
+ sqlType: String): Throwable = {
+ new AnalysisException(
+ errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
+ messageParameters = Map(
+ "avroPath" -> avroPath,
+ "sqlPath" -> sqlPath,
+ "avroType" -> avroType,
+ "sqlType" -> toSQLType(sqlType)
+ )
+ )
+ }
+
def optionMustBeLiteralString(key: String): Throwable = {
new AnalysisException(
errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index aeef531dbcd..ed887fd5cb2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4332,6 +4332,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA =
+ buildConf("spark.sql.legacy.avro.allowIncompatibleSchema")
+ .internal()
+ .doc("When set to false, if types in Avro are encoded in the same
format, but " +
+ "the type in the Avro schema explicitly says that the data types are
different, " +
+ "reject reading the data type in the format to avoid returning
incorrect results. " +
+ "When set to true, it restores the legacy behavior of allow reading
the data in the" +
+ " format, which may return incorrect results.")
+ .version("3.5.1")
+ .booleanConf
+ .createWithDefault(false)
+
val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME =
buildConf("spark.sql.legacy.v1IdentifierNoCatalog")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]