This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new ab1443052347 [SPARK-46275] Protobuf: Return null in permissive mode 
when deserialization fails
ab1443052347 is described below

commit ab14430523473528bafa41d8f10bc33efbb74493
Author: Raghu Angadi <raghu.ang...@databricks.com>
AuthorDate: Fri Dec 8 16:40:27 2023 +0900

    [SPARK-46275] Protobuf: Return null in permissive mode when deserialization 
fails
    
    ### What changes were proposed in this pull request?
    This updates the the behavior of `from_protobuf()` built function when 
underlying record fails to deserialize.
    
      * **Current behvior**:
        * By default, this would throw an error and the query fails. [This part 
is not changed in the PR]
        * When `mode` is set to 'PERMISSIVE' it returns a non-null struct with 
each of the inner fields set to null e.g. `{ "field_a": null, "field_b": null 
}`  etc.
           * This is not very convenient to the users. They don't know if this 
was due to malformed record or if the input itself has null. It is very hard to 
check for each field for null in SQL query (imagine a sql query with a struct 
that has 10 fields).
    
      * **New behavior**
        * When `mode` is set to 'PERMISSIVE' it simply returns `null`.
    
    ### Why are the changes needed?
    This makes it easier for users to detect and handle malformed records.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, but this does not change the contract. In fact, it clarifies it.
    
    ### How was this patch tested?
     - Unit tests are updated.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44214 from rangadi/protobuf-null.
    
    Authored-by: Raghu Angadi <raghu.ang...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 309c796876f310f8604292d84acc12e711ba7031)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/protobuf/ProtobufDataToCatalyst.scala      | 31 ++++------------------
 .../ProtobufCatalystDataConversionSuite.scala      | 13 +--------
 2 files changed, 6 insertions(+), 38 deletions(-)

diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
index 5c4a5ff06896..d2417674837b 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
@@ -22,12 +22,12 @@ import scala.util.control.NonFatal
 import com.google.protobuf.DynamicMessage
 import com.google.protobuf.TypeRegistry
 
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, SpecificInternalRow, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
 import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, 
SchemaConverters}
-import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, 
StructType}
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
 
 private[sql] case class ProtobufDataToCatalyst(
     child: Expression,
@@ -39,16 +39,8 @@ private[sql] case class ProtobufDataToCatalyst(
 
   override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
 
-  override lazy val dataType: DataType = {
-    val dt = SchemaConverters.toSqlType(messageDescriptor, 
protobufOptions).dataType
-    parseMode match {
-      // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
-      // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
-      // Therefore we force the schema to be all nullable here.
-      case PermissiveMode => dt.asNullable
-      case _ => dt
-    }
-  }
+  override lazy val dataType: DataType =
+    SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType
 
   override def nullable: Boolean = true
 
@@ -87,22 +79,9 @@ private[sql] case class ProtobufDataToCatalyst(
     mode
   }
 
-  @transient private lazy val nullResultRow: Any = dataType match {
-    case st: StructType =>
-      val resultRow = new SpecificInternalRow(st.map(_.dataType))
-      for (i <- 0 until st.length) {
-        resultRow.setNullAt(i)
-      }
-      resultRow
-
-    case _ =>
-      null
-  }
-
   private def handleException(e: Throwable): Any = {
     parseMode match {
-      case PermissiveMode =>
-        nullResultRow
+      case PermissiveMode => null
       case FailFastMode =>
         throw 
QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e)
       case _ =>
diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index b7f17fece5fa..62d0efd7459b 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -79,20 +79,9 @@ class ProtobufCatalystDataConversionSuite
         .eval()
     }
 
-    val expected = {
-      val expectedSchema = ProtobufUtils.buildDescriptor(descBytes, badSchema)
-      SchemaConverters.toSqlType(expectedSchema).dataType match {
-        case st: StructType =>
-          Row.fromSeq((0 until st.length).map { _ =>
-            null
-          })
-        case _ => null
-      }
-    }
-
     checkEvaluation(
       ProtobufDataToCatalyst(binary, badSchema, Some(descBytes), Map("mode" -> 
"PERMISSIVE")),
-      expected)
+      expected = null)
   }
 
   protected def prepareExpectedResult(expected: Any): Any = expected match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to