This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new ab1443052347 [SPARK-46275] Protobuf: Return null in permissive mode
when deserialization fails
ab1443052347 is described below
commit ab14430523473528bafa41d8f10bc33efbb74493
Author: Raghu Angadi <[email protected]>
AuthorDate: Fri Dec 8 16:40:27 2023 +0900
[SPARK-46275] Protobuf: Return null in permissive mode when deserialization
fails
### What changes were proposed in this pull request?
This updates the the behavior of `from_protobuf()` built function when
underlying record fails to deserialize.
* **Current behvior**:
* By default, this would throw an error and the query fails. [This part
is not changed in the PR]
* When `mode` is set to 'PERMISSIVE' it returns a non-null struct with
each of the inner fields set to null e.g. `{ "field_a": null, "field_b": null
}` etc.
* This is not very convenient to the users. They don't know if this
was due to malformed record or if the input itself has null. It is very hard to
check for each field for null in SQL query (imagine a sql query with a struct
that has 10 fields).
* **New behavior**
* When `mode` is set to 'PERMISSIVE' it simply returns `null`.
### Why are the changes needed?
This makes it easier for users to detect and handle malformed records.
### Does this PR introduce _any_ user-facing change?
Yes, but this does not change the contract. In fact, it clarifies it.
### How was this patch tested?
- Unit tests are updated.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44214 from rangadi/protobuf-null.
Authored-by: Raghu Angadi <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 309c796876f310f8604292d84acc12e711ba7031)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/protobuf/ProtobufDataToCatalyst.scala | 31 ++++------------------
.../ProtobufCatalystDataConversionSuite.scala | 13 +--------
2 files changed, 6 insertions(+), 38 deletions(-)
diff --git
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
index 5c4a5ff06896..d2417674837b 100644
---
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
+++
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
@@ -22,12 +22,12 @@ import scala.util.control.NonFatal
import com.google.protobuf.DynamicMessage
import com.google.protobuf.TypeRegistry
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes,
Expression, SpecificInternalRow, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes,
Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode,
PermissiveMode}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils,
SchemaConverters}
-import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType,
StructType}
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
private[sql] case class ProtobufDataToCatalyst(
child: Expression,
@@ -39,16 +39,8 @@ private[sql] case class ProtobufDataToCatalyst(
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
- override lazy val dataType: DataType = {
- val dt = SchemaConverters.toSqlType(messageDescriptor,
protobufOptions).dataType
- parseMode match {
- // With PermissiveMode, the output Catalyst row might contain columns of
null values for
- // corrupt records, even if some of the columns are not nullable in the
user-provided schema.
- // Therefore we force the schema to be all nullable here.
- case PermissiveMode => dt.asNullable
- case _ => dt
- }
- }
+ override lazy val dataType: DataType =
+ SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType
override def nullable: Boolean = true
@@ -87,22 +79,9 @@ private[sql] case class ProtobufDataToCatalyst(
mode
}
- @transient private lazy val nullResultRow: Any = dataType match {
- case st: StructType =>
- val resultRow = new SpecificInternalRow(st.map(_.dataType))
- for (i <- 0 until st.length) {
- resultRow.setNullAt(i)
- }
- resultRow
-
- case _ =>
- null
- }
-
private def handleException(e: Throwable): Any = {
parseMode match {
- case PermissiveMode =>
- nullResultRow
+ case PermissiveMode => null
case FailFastMode =>
throw
QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e)
case _ =>
diff --git
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index b7f17fece5fa..62d0efd7459b 100644
---
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -79,20 +79,9 @@ class ProtobufCatalystDataConversionSuite
.eval()
}
- val expected = {
- val expectedSchema = ProtobufUtils.buildDescriptor(descBytes, badSchema)
- SchemaConverters.toSqlType(expectedSchema).dataType match {
- case st: StructType =>
- Row.fromSeq((0 until st.length).map { _ =>
- null
- })
- case _ => null
- }
- }
-
checkEvaluation(
ProtobufDataToCatalyst(binary, badSchema, Some(descBytes), Map("mode" ->
"PERMISSIVE")),
- expected)
+ expected = null)
}
protected def prepareExpectedResult(expected: Any): Any = expected match {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]