This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b1677a425408 [SPARK-48545][SQL] Create to_avro and from_avro SQL 
functions to match DataFrame equivalents
b1677a425408 is described below

commit b1677a4254088bbe851170884d8364b730741826
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Fri Jun 21 12:12:04 2024 -0700

    [SPARK-48545][SQL] Create to_avro and from_avro SQL functions to match 
DataFrame equivalents
    
    ### What changes were proposed in this pull request?
    
    This PR creates two new SQL functions "to_avro" and "from_avro" to match 
existing DataFrame equivalents.
    
    For example:
    
    ```
    sql(
      """
        |create table t as
        |  select named_struct('u', named_struct('member0', member0, 'member1', 
member1)) as s
        |  from values (1, null), (null,  'a') tab(member0, member1)
        |""".stripMargin)
    
    val jsonFormatSchema =
      """
        |{
        |  "type": "record",
        |  "name": "struct",
        |  "fields": [{
        |    "name": "u",
        |    "type": ["int","string"]
        |  }]
        |}
        |""".stripMargin
    
    spark.sql(
      s"""
        |select from_avro(result, '$jsonFormatSchema', map()).u from (
        |  select to_avro(s, '$jsonFormatSchema') as result from t
        |)")
      .collect()
    
    > {1, NULL}
      {NULL, "a"}
    ```
    
    ### Why are the changes needed?
    
    This brings parity between SQL and DataFrame APIs in Apache Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, see above.
    
    ### How was this patch tested?
    
    This PR adds extra unit tests, and I also checked that the functions work 
with `spark-shell`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No GitHub copilot usage this time
    
    Closes #46977 from dtenedor/from-avro.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../apache/spark/sql/avro/AvroFunctionsSuite.scala |  83 +++++++++-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   6 +-
 .../expressions/toFromAvroSqlFunctions.scala       | 175 +++++++++++++++++++++
 .../apache/spark/sql/ExpressionsSchemaSuite.scala  |   4 +
 .../sql/expressions/ExpressionInfoSuite.scala      |   3 +
 sql/gen-sql-functions-docs.py                      |   3 +-
 6 files changed, 271 insertions(+), 3 deletions(-)

diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index d16ddb497320..c807685db0f0 100644
--- 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -26,7 +26,7 @@ import org.apache.avro.generic.{GenericDatumWriter, 
GenericRecord, GenericRecord
 import org.apache.avro.io.EncoderFactory
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.functions.{col, lit, struct}
 import org.apache.spark.sql.internal.SQLConf
@@ -286,4 +286,85 @@ class AvroFunctionsSuite extends QueryTest with 
SharedSparkSession {
       assert(msg.contains("Invalid default for field id: null not a \"long\""))
     }
   }
+
+  test("SPARK-48545: from_avro and to_avro SQL functions") {
+    withTable("t") {
+      sql(
+        """
+          |create table t as
+          |  select named_struct('u', named_struct('member0', member0, 
'member1', member1)) as s
+          |  from values (1, null), (null,  'a') tab(member0, member1)
+          |""".stripMargin)
+      val jsonFormatSchema =
+        """
+          |{
+          |  "type": "record",
+          |  "name": "struct",
+          |  "fields": [{
+          |    "name": "u",
+          |    "type": ["int","string"]
+          |  }]
+          |}
+          |""".stripMargin
+      val toAvroSql =
+        s"""
+           |select to_avro(s, '$jsonFormatSchema') as result from t
+           |""".stripMargin
+      val avroResult = spark.sql(toAvroSql).collect()
+      assert(avroResult != null)
+      checkAnswer(
+        spark.sql(s"select from_avro(result, '$jsonFormatSchema', map()).u 
from ($toAvroSql)"),
+        Seq(Row(Row(1, null)),
+          Row(Row(null, "a"))))
+
+      // Negative tests.
+      checkError(
+        exception = intercept[AnalysisException](sql(
+          s"""
+             |select to_avro(s, 42) as result from t
+             |""".stripMargin)),
+        errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+        parameters = Map("sqlExpr" -> "\"toavro(s, 42)\"",
+          "msg" -> ("The second argument of the TO_AVRO SQL function must be a 
constant string " +
+            "containing the JSON representation of the schema to use for 
converting the value to " +
+            "AVRO format"),
+          "hint" -> ""),
+        queryContext = Array(ExpectedContext(
+          fragment = "to_avro(s, 42)",
+          start = 8,
+          stop = 21)))
+      checkError(
+        exception = intercept[AnalysisException](sql(
+          s"""
+             |select from_avro(s, 42, '') as result from t
+             |""".stripMargin)),
+        errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+        parameters = Map("sqlExpr" -> "\"fromavro(s, 42, )\"",
+          "msg" -> ("The second argument of the FROM_AVRO SQL function must be 
a constant string " +
+            "containing the JSON representation of the schema to use for 
converting the value " +
+            "from AVRO format"),
+          "hint" -> ""),
+        queryContext = Array(ExpectedContext(
+          fragment = "from_avro(s, 42, '')",
+          start = 8,
+          stop = 27)))
+      checkError(
+        exception = intercept[AnalysisException](sql(
+          s"""
+             |select from_avro(s, '$jsonFormatSchema', 42) as result from t
+             |""".stripMargin)),
+        errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+        parameters = Map(
+          "sqlExpr" ->
+            s"\"fromavro(s, $jsonFormatSchema, 42)\"".stripMargin,
+          "msg" -> ("The third argument of the FROM_AVRO SQL function must be 
a constant map of " +
+            "strings to strings containing the options to use for converting 
the value " +
+            "from AVRO format"),
+          "hint" -> ""),
+        queryContext = Array(ExpectedContext(
+          fragment = s"from_avro(s, '$jsonFormatSchema', 42)",
+          start = 8,
+          stop = 138)))
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 3a418497fa53..20da1c030b53 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -860,7 +860,11 @@ object FunctionRegistry {
     // Xml
     expression[XmlToStructs]("from_xml"),
     expression[SchemaOfXml]("schema_of_xml"),
-    expression[StructsToXml]("to_xml")
+    expression[StructsToXml]("to_xml"),
+
+    // Avro
+    expression[FromAvro]("from_avro"),
+    expression[ToAvro]("to_avro")
   )
 
   val builtin: SimpleFunctionRegistry = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
new file mode 100644
index 000000000000..507511a36007
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{MapType, NullType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Converts a binary column of Avro format into its corresponding Catalyst 
value.
+ * This is a thin wrapper over the [[AvroDataToCatalyst]] class to create a 
SQL function.
+ *
+ * @param child the Catalyst binary input column.
+ * @param jsonFormatSchema the Avro schema in JSON string format.
+ * @param options the options to use when performing the conversion.
+ *
+ * @since 4.0.0
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(child, jsonFormatSchema, options) - Converts a binary Avro value 
into a Catalyst value.
+    """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{ 
"name": "u", "type": ["int","string"] }]}', map()) IS NULL AS result FROM 
(SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) 
AS s FROM VALUES (1, NULL), (NULL,  'a') tab(member0, member1));
+       [false]
+  """,
+  note = """
+    The specified schema must match actual schema of the read data, otherwise 
the behavior
+    is undefined: it may fail or return arbitrary result.
+    To deserialize the data with a compatible and evolved schema, the expected 
Avro schema can be
+    set via the corresponding option.
+  """,
+  group = "misc_funcs",
+  since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: 
Expression)
+  extends TernaryExpression with RuntimeReplaceable {
+  override def first: Expression = child
+  override def second: Expression = jsonFormatSchema
+  override def third: Expression = options
+
+  override def withNewChildrenInternal(
+      newFirst: Expression, newSecond: Expression, newThird: Expression): 
Expression = {
+    copy(child = newFirst, jsonFormatSchema = newSecond, options = newThird)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val schemaCheck = jsonFormatSchema.dataType match {
+      case _: StringType |
+           _: NullType
+        if jsonFormatSchema.foldable =>
+        None
+      case _ =>
+        Some(TypeCheckResult.TypeCheckFailure(
+          "The second argument of the FROM_AVRO SQL function must be a 
constant string " +
+            "containing the JSON representation of the schema to use for 
converting the value " +
+            "from AVRO format"))
+    }
+    val optionsCheck = options.dataType match {
+      case MapType(StringType, StringType, _) |
+           MapType(NullType, NullType, _) |
+           _: NullType
+        if options.foldable =>
+        None
+      case _ =>
+        Some(TypeCheckResult.TypeCheckFailure(
+          "The third argument of the FROM_AVRO SQL function must be a constant 
map of strings to " +
+            "strings containing the options to use for converting the value 
from AVRO format"))
+    }
+    schemaCheck.getOrElse(
+      optionsCheck.getOrElse(
+        TypeCheckResult.TypeCheckSuccess))
+  }
+
+  override def replacement: Expression = {
+    val schemaValue: String = jsonFormatSchema.eval() match {
+      case s: UTF8String =>
+        s.toString
+      case null =>
+        ""
+    }
+    val optionsValue: Map[String, String] = options.eval() match {
+      case a: ArrayBasedMapData if a.keyArray.array.nonEmpty =>
+        val keys: Array[String] = a.keyArray.array.map(_.toString)
+        val values: Array[String] = a.valueArray.array.map(_.toString)
+        keys.zip(values).toMap
+      case _ =>
+        Map.empty
+    }
+    val constructor =
+      
Utils.classForName("org.apache.spark.sql.avro.AvroDataToCatalyst").getConstructors().head
+    val expr = constructor.newInstance(child, schemaValue, optionsValue)
+    expr.asInstanceOf[Expression]
+  }
+}
+
+/**
+ * Converts a Catalyst binary input value into its corresponding AvroAvro 
format result.
+ * This is a thin wrapper over the [[CatalystDataToAvro]] class to create a 
SQL function.
+ *
+ * @param child the Catalyst binary input column.
+ * @param jsonFormatSchema the Avro schema in JSON string format.
+ *
+ * @since 4.0.0
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(child, jsonFormatSchema) - Converts a Catalyst binary input value 
into its corresponding
+      Avro format result.
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{ 
"name": "u", "type": ["int","string"] }]}', MAP()) IS NULL FROM (SELECT NULL AS 
s);
+       [true]
+  """,
+  group = "misc_funcs",
+  since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class ToAvro(child: Expression, jsonFormatSchema: Expression)
+  extends BinaryExpression with RuntimeReplaceable {
+  override def left: Expression = child
+
+  override def right: Expression = jsonFormatSchema
+
+  override def withNewChildrenInternal(newLeft: Expression, newRight: 
Expression): Expression = {
+    copy(child = newLeft, jsonFormatSchema = newRight)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    jsonFormatSchema.dataType match {
+      case _: StringType if jsonFormatSchema.foldable =>
+        TypeCheckResult.TypeCheckSuccess
+      case _ =>
+        TypeCheckResult.TypeCheckFailure(
+          "The second argument of the TO_AVRO SQL function must be a constant 
string " +
+            "containing the JSON representation of the schema to use for 
converting the value " +
+            "to AVRO format")
+    }
+  }
+
+  override def replacement: Expression = {
+    val schemaValue: Option[String] = jsonFormatSchema.eval() match {
+      case null =>
+        None
+      case s: UTF8String =>
+        Some(s.toString)
+    }
+    val constructor =
+      
Utils.classForName("org.apache.spark.sql.avro.CatalystDataToAvro").getConstructors().head
+    val expr = constructor.newInstance(child, schemaValue)
+    expr.asInstanceOf[Expression]
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
index 73b2eba7060d..443597f10056 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
@@ -117,6 +117,10 @@ class ExpressionsSchemaSuite extends QueryTest with 
SharedSparkSession {
         // Note: We need to filter out the commands that set the parameters, 
such as:
         // SET spark.sql.parser.escapedStringLiterals=true
         example.split("  > 
").tail.filterNot(_.trim.startsWith("SET")).take(1).foreach {
+          case _ if funcName == "from_avro" || funcName == "to_avro" =>
+            // Skip running the example queries for the from_avro and to_avro 
functions because
+            // these functions dynamically load the AvroDataToCatalyst or 
CatalystDataToAvro classes
+            // which are not available in this test.
           case exampleRe(sql, _) =>
             val df = spark.sql(sql)
             val escapedSql = sql.replaceAll("\\|", "&#124;")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index e80f4af1dc46..bf5d1b24af21 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -225,6 +225,9 @@ class ExpressionInfoSuite extends SparkFunSuite with 
SharedSparkSession {
       // Throws an error
       "org.apache.spark.sql.catalyst.expressions.RaiseErrorExpressionBuilder",
       "org.apache.spark.sql.catalyst.expressions.AssertTrue",
+      // Requires dynamic class loading not available in this test suite.
+      "org.apache.spark.sql.catalyst.expressions.FromAvro",
+      "org.apache.spark.sql.catalyst.expressions.ToAvro",
       classOf[CurrentUser].getName,
       // The encrypt expression includes a random initialization vector to its 
encrypted result
       classOf[AesEncrypt].getName)
diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py
index 053e11d10295..dc48a5a6155e 100644
--- a/sql/gen-sql-functions-docs.py
+++ b/sql/gen-sql-functions-docs.py
@@ -163,7 +163,8 @@ def _make_pretty_examples(jspark, infos):
 
     pretty_output = ""
     for info in infos:
-        if info.examples.startswith("\n    Examples:"):
+        if (info.examples.startswith("\n    Examples:")
+                and info.name.lower() not in ("from_avro", "to_avro")):
             output = []
             output.append("-- %s" % info.name)
             query_examples = filter(lambda x: x.startswith("      > "), 
info.examples.split("\n"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to