This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new be0780b8f931 [SPARK-50350][SQL] Avro: add new function 
`schema_of_avro` (`scala` side)
be0780b8f931 is described below

commit be0780b8f931abda193ee69f65caf625f7118cb4
Author: panbingkun <[email protected]>
AuthorDate: Fri Dec 6 13:10:07 2024 -0800

    [SPARK-50350][SQL] Avro: add new function `schema_of_avro` (`scala` side)
    
    ### What changes were proposed in this pull request?
    The pr aims to add new function `schema_of_avro` for `avro`.
    
    ### Why are the changes needed?
    - The schema format of Avro is different from that of Spark when presented 
to end users. In order to facilitate the intuitive understanding of Avro's 
schema by end users.
    - Similar functions exist in other formats of data, such as `csv`, `json` 
and `xml`,
    
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L872-L875
    
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L836-L839
    
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L877-L880
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, end-users will be able to clearly know what `Avro's schema` format 
should look like in `Spark` through the function `schema_of_avro`.
    
    ### How was this patch tested?
    - Add new UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #48889 from panbingkun/SPARK-50350.
    
    Authored-by: panbingkun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/avro/AvroExpressionEvalUtils.scala   | 45 +++++++++++
 .../org/apache/spark/sql/avro/SchemaOfAvro.scala   | 71 +++++++++++++++++
 .../apache/spark/sql/avro/AvroFunctionsSuite.scala | 36 +++++++++
 .../org/apache/spark/sql/avro/functions.scala      | 28 +++++++
 .../sql/catalyst/analysis/FunctionRegistry.scala   |  1 +
 ...roSqlFunctions.scala => avroSqlFunctions.scala} | 93 ++++++++++++++++++++++
 .../spark/sql/CollationExpressionWalkerSuite.scala |  1 +
 .../apache/spark/sql/ExpressionsSchemaSuite.scala  |  3 +-
 .../sql/expressions/ExpressionInfoSuite.scala      |  1 +
 9 files changed, 278 insertions(+), 1 deletion(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroExpressionEvalUtils.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroExpressionEvalUtils.scala
new file mode 100644
index 000000000000..1a9a3609c8a5
--- /dev/null
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroExpressionEvalUtils.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.avro.Schema
+
+import org.apache.spark.sql.catalyst.util.{ParseMode, PermissiveMode}
+import org.apache.spark.unsafe.types.UTF8String
+
+object AvroExpressionEvalUtils {
+
+  def schemaOfAvro(
+      avroOptions: AvroOptions,
+      parseMode: ParseMode,
+      expectedSchema: Schema): UTF8String = {
+    val dt = SchemaConverters.toSqlType(
+      expectedSchema,
+      avroOptions.useStableIdForUnionType,
+      avroOptions.stableIdPrefixForUnionType,
+      avroOptions.recursiveFieldMaxDepth).dataType
+    val schema = parseMode match {
+      // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
+      // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
+      // Therefore we force the schema to be all nullable here.
+      case PermissiveMode => dt.asNullable
+      case _ => dt
+    }
+    UTF8String.fromString(schema.sql)
+  }
+}
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaOfAvro.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaOfAvro.scala
new file mode 100644
index 000000000000..094fd4254e16
--- /dev/null
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaOfAvro.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.avro.Schema
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, 
Literal, RuntimeReplaceable}
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
+import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, ObjectType}
+
+private[sql] case class SchemaOfAvro(
+    jsonFormatSchema: String,
+    options: Map[String, String])
+  extends LeafExpression with RuntimeReplaceable {
+
+  override def dataType: DataType = SQLConf.get.defaultStringType
+
+  override def nullable: Boolean = false
+
+  @transient private lazy val avroOptions = AvroOptions(options)
+
+  @transient private lazy val actualSchema =
+    new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)
+
+  @transient private lazy val expectedSchema = 
avroOptions.schema.getOrElse(actualSchema)
+
+  @transient private lazy val parseMode: ParseMode = {
+    val mode = avroOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw QueryCompilationErrors.parseModeUnsupportedError(
+        prettyName, mode
+      )
+    }
+    mode
+  }
+
+  override def prettyName: String = "schema_of_avro"
+
+  @transient private lazy val avroOptionsObjectType = 
ObjectType(classOf[AvroOptions])
+  @transient private lazy val parseModeObjectType = 
ObjectType(classOf[ParseMode])
+  @transient private lazy val schemaObjectType = ObjectType(classOf[Schema])
+
+  override def replacement: Expression = StaticInvoke(
+    AvroExpressionEvalUtils.getClass,
+    dataType,
+    "schemaOfAvro",
+    Seq(
+      Literal(avroOptions, avroOptionsObjectType),
+      Literal(parseMode, parseModeObjectType),
+      Literal(expectedSchema, schemaObjectType)),
+    Seq(avroOptionsObjectType, parseModeObjectType, schemaObjectType)
+  )
+}
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index 096cdfe0b9ee..8c128d4c7ea6 100644
--- 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -629,4 +629,40 @@ class AvroFunctionsSuite extends QueryTest with 
SharedSparkSession {
       assert(readbackPerson2.get(2).toString === person2.get(2))
     }
   }
+
+  test("schema_of_avro") {
+    val df = spark.range(1)
+    val avroIntType = s"""
+      |{
+      |  "type": "int",
+      |  "name": "id"
+      |}""".stripMargin
+    checkAnswer(df.select(functions.schema_of_avro(avroIntType)), Row("INT"))
+
+    val avroStructType =
+      """
+        |{
+        |  "type": "record",
+        |  "name": "person",
+        |  "fields": [
+        |    {"name": "name", "type": "string"},
+        |    {"name": "age", "type": "int"},
+        |    {"name": "country", "type": "string"}
+        |  ]
+        |}""".stripMargin
+    checkAnswer(df.select(functions.schema_of_avro(avroStructType)),
+      Row("STRUCT<name: STRING NOT NULL, age: INT NOT NULL, country: STRING 
NOT NULL>"))
+
+    val avroMultiType =
+      """
+        |{
+        |  "type": "record",
+        |  "name": "person",
+        |  "fields": [
+        |     {"name": "u", "type": ["int", "string"]}
+        |  ]
+        |}""".stripMargin
+    checkAnswer(df.select(functions.schema_of_avro(avroMultiType)),
+      Row("STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>"))
+  }
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
index fffad557aca5..e30a9e7c2ba0 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
@@ -94,4 +94,32 @@ object functions {
   def to_avro(data: Column, jsonFormatSchema: String): Column = {
     Column.fn("to_avro", data, lit(jsonFormatSchema))
   }
+
+  /**
+   * Returns schema in the DDL format of the avro schema in JSON string format.
+   *
+   * @param jsonFormatSchema
+   *   the avro schema in JSON string format.
+   *
+   * @since 4.0.0
+   */
+  @Experimental
+  def schema_of_avro(jsonFormatSchema: String): Column = {
+    Column.fn("schema_of_avro", lit(jsonFormatSchema))
+  }
+
+  /**
+   * Returns schema in the DDL format of the avro schema in JSON string format.
+   *
+   * @param jsonFormatSchema
+   *   the avro schema in JSON string format.
+   * @param options
+   *   options to control how the Avro record is parsed.
+   *
+   * @since 4.0.0
+   */
+  @Experimental
+  def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String, 
String]): Column = {
+    Column.fnWithOptions("schema_of_avro", options.asScala.iterator, 
lit(jsonFormatSchema))
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index d9e9f49ce065..54f6820d2091 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -884,6 +884,7 @@ object FunctionRegistry {
     // Avro
     expression[FromAvro]("from_avro"),
     expression[ToAvro]("to_avro"),
+    expression[SchemaOfAvro]("schema_of_avro"),
 
     // Protobuf
     expression[FromProtobuf]("from_protobuf"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/avroSqlFunctions.scala
similarity index 69%
rename from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
rename to 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/avroSqlFunctions.scala
index 457f469e0f68..6693ee83fd4a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/avroSqlFunctions.scala
@@ -200,3 +200,96 @@ case class ToAvro(child: Expression, jsonFormatSchema: 
Expression)
   override def prettyName: String =
     getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("to_avro")
 }
+
+/**
+ * Returns schema in the DDL format of the avro schema in JSON string format.
+ * This is a thin wrapper over the [[SchemaOfAvro]] class to create a SQL 
function.
+ *
+ * @param jsonFormatSchema the Avro schema in JSON string format.
+ * @param options the options to use when performing the conversion.
+ *
+ * @since 4.0.0
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+    _FUNC_(jsonFormatSchema, options) - Returns schema in the DDL format of 
the avro schema in JSON string format.
+    """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_('{"type": "record", "name": "struct", "fields": 
[{"name": "u", "type": ["int", "string"]}]}', map());
+       STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>
+  """,
+  group = "misc_funcs",
+  since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class SchemaOfAvro(jsonFormatSchema: Expression, options: Expression)
+  extends BinaryExpression with RuntimeReplaceable {
+
+  override def left: Expression = jsonFormatSchema
+  override def right: Expression = options
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression, newRight: Expression): Expression =
+    copy(jsonFormatSchema = newLeft, options = newRight)
+
+  def this(jsonFormatSchema: Expression) =
+    this(jsonFormatSchema, Literal.create(null))
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val schemaCheck = jsonFormatSchema.dataType match {
+      case _: StringType |
+           _: NullType
+        if jsonFormatSchema.foldable =>
+        None
+      case _ =>
+        Some(TypeCheckResult.TypeCheckFailure("The first argument of the 
SCHEMA_OF_AVRO SQL " +
+          "function must be a constant string containing the JSON 
representation of the schema " +
+          "to use for converting the value from AVRO format"))
+    }
+    val optionsCheck = options.dataType match {
+      case MapType(StringType, StringType, _) |
+           MapType(NullType, NullType, _) |
+           _: NullType
+        if options.foldable =>
+        None
+      case _ =>
+        Some(TypeCheckResult.TypeCheckFailure("The second argument of the 
SCHEMA_OF_AVRO SQL " +
+          "function must be a constant map of strings to strings containing 
the options to use " +
+          "for converting the value from AVRO format"))
+    }
+    schemaCheck.getOrElse(
+      optionsCheck.getOrElse(
+        TypeCheckResult.TypeCheckSuccess))
+  }
+
+  override lazy val replacement: Expression = {
+    val schemaValue: String = jsonFormatSchema.eval() match {
+      case s: UTF8String =>
+        s.toString
+      case null =>
+        ""
+    }
+    val optionsValue: Map[String, String] = options.eval() match {
+      case a: ArrayBasedMapData if a.keyArray.array.nonEmpty =>
+        val keys: Array[String] = a.keyArray.array.map(_.toString)
+        val values: Array[String] = a.valueArray.array.map(_.toString)
+        keys.zip(values).toMap
+      case _ =>
+        Map.empty
+    }
+    val constructor = try {
+      
Utils.classForName("org.apache.spark.sql.avro.SchemaOfAvro").getConstructors.head
+    } catch {
+      case _: java.lang.ClassNotFoundException =>
+        throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(
+          functionName = "SCHEMA_OF_AVRO")
+    }
+    val expr = constructor.newInstance(schemaValue, optionsValue)
+    expr.asInstanceOf[Expression]
+  }
+
+  override def prettyName: String =
+    getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("schema_of_avro")
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CollationExpressionWalkerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationExpressionWalkerSuite.scala
index e3622c310185..d0581621148a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/CollationExpressionWalkerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationExpressionWalkerSuite.scala
@@ -729,6 +729,7 @@ class CollationExpressionWalkerSuite extends SparkFunSuite 
with SharedSparkSessi
       // other functions which are not yet supported
       "to_avro",
       "from_avro",
+      "schema_of_avro",
       "to_protobuf",
       "from_protobuf"
     )
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
index 8c0231fddf39..0468ceb9f967 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
@@ -118,7 +118,8 @@ class ExpressionsSchemaSuite extends QueryTest with 
SharedSparkSession {
         // SET spark.sql.parser.escapedStringLiterals=true
         example.split("  > 
").tail.filterNot(_.trim.startsWith("SET")).take(1).foreach {
           case _ if funcName == "from_avro" || funcName == "to_avro" ||
-            funcName == "from_protobuf" || funcName == "to_protobuf" =>
+            funcName == "schema_of_avro" || funcName == "from_protobuf" ||
+            funcName == "to_protobuf" =>
               // Skip running the example queries for the from_avro, to_avro, 
from_protobuf and
               // to_protobuf functions because these functions dynamically 
load the
               // AvroDataToCatalyst or CatalystDataToAvro classes which are 
not available in this
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index a6fc43aa087d..c00f00ceaa35 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -229,6 +229,7 @@ class ExpressionInfoSuite extends SparkFunSuite with 
SharedSparkSession {
       // Requires dynamic class loading not available in this test suite.
       "org.apache.spark.sql.catalyst.expressions.FromAvro",
       "org.apache.spark.sql.catalyst.expressions.ToAvro",
+      "org.apache.spark.sql.catalyst.expressions.SchemaOfAvro",
       "org.apache.spark.sql.catalyst.expressions.FromProtobuf",
       "org.apache.spark.sql.catalyst.expressions.ToProtobuf",
       classOf[CurrentUser].getName,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to