This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new be0780b8f931 [SPARK-50350][SQL] Avro: add new function
`schema_of_avro` (`scala` side)
be0780b8f931 is described below
commit be0780b8f931abda193ee69f65caf625f7118cb4
Author: panbingkun <[email protected]>
AuthorDate: Fri Dec 6 13:10:07 2024 -0800
[SPARK-50350][SQL] Avro: add new function `schema_of_avro` (`scala` side)
### What changes were proposed in this pull request?
The pr aims to add new function `schema_of_avro` for `avro`.
### Why are the changes needed?
- The schema format of Avro is different from that of Spark when presented
to end users. In order to facilitate the intuitive understanding of Avro's
schema by end users.
- Similar functions exist in other formats of data, such as `csv`, `json`
and `xml`,
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L872-L875
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L836-L839
https://github.com/apache/spark/blob/87a5b37ec3c4b383a5938144612c07187d597ff8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L877-L880
### Does this PR introduce _any_ user-facing change?
Yes, end-users will be able to clearly know what `Avro's schema` format
should look like in `Spark` through the function `schema_of_avro`.
### How was this patch tested?
- Add new UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48889 from panbingkun/SPARK-50350.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/avro/AvroExpressionEvalUtils.scala | 45 +++++++++++
.../org/apache/spark/sql/avro/SchemaOfAvro.scala | 71 +++++++++++++++++
.../apache/spark/sql/avro/AvroFunctionsSuite.scala | 36 +++++++++
.../org/apache/spark/sql/avro/functions.scala | 28 +++++++
.../sql/catalyst/analysis/FunctionRegistry.scala | 1 +
...roSqlFunctions.scala => avroSqlFunctions.scala} | 93 ++++++++++++++++++++++
.../spark/sql/CollationExpressionWalkerSuite.scala | 1 +
.../apache/spark/sql/ExpressionsSchemaSuite.scala | 3 +-
.../sql/expressions/ExpressionInfoSuite.scala | 1 +
9 files changed, 278 insertions(+), 1 deletion(-)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroExpressionEvalUtils.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroExpressionEvalUtils.scala
new file mode 100644
index 000000000000..1a9a3609c8a5
--- /dev/null
+++
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroExpressionEvalUtils.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.avro.Schema
+
+import org.apache.spark.sql.catalyst.util.{ParseMode, PermissiveMode}
+import org.apache.spark.unsafe.types.UTF8String
+
+object AvroExpressionEvalUtils {
+
+ def schemaOfAvro(
+ avroOptions: AvroOptions,
+ parseMode: ParseMode,
+ expectedSchema: Schema): UTF8String = {
+ val dt = SchemaConverters.toSqlType(
+ expectedSchema,
+ avroOptions.useStableIdForUnionType,
+ avroOptions.stableIdPrefixForUnionType,
+ avroOptions.recursiveFieldMaxDepth).dataType
+ val schema = parseMode match {
+ // With PermissiveMode, the output Catalyst row might contain columns of
null values for
+ // corrupt records, even if some of the columns are not nullable in the
user-provided schema.
+ // Therefore we force the schema to be all nullable here.
+ case PermissiveMode => dt.asNullable
+ case _ => dt
+ }
+ UTF8String.fromString(schema.sql)
+ }
+}
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaOfAvro.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaOfAvro.scala
new file mode 100644
index 000000000000..094fd4254e16
--- /dev/null
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaOfAvro.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.avro.Schema
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression,
Literal, RuntimeReplaceable}
+import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
+import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode,
PermissiveMode}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, ObjectType}
+
+private[sql] case class SchemaOfAvro(
+ jsonFormatSchema: String,
+ options: Map[String, String])
+ extends LeafExpression with RuntimeReplaceable {
+
+ override def dataType: DataType = SQLConf.get.defaultStringType
+
+ override def nullable: Boolean = false
+
+ @transient private lazy val avroOptions = AvroOptions(options)
+
+ @transient private lazy val actualSchema =
+ new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)
+
+ @transient private lazy val expectedSchema =
avroOptions.schema.getOrElse(actualSchema)
+
+ @transient private lazy val parseMode: ParseMode = {
+ val mode = avroOptions.parseMode
+ if (mode != PermissiveMode && mode != FailFastMode) {
+ throw QueryCompilationErrors.parseModeUnsupportedError(
+ prettyName, mode
+ )
+ }
+ mode
+ }
+
+ override def prettyName: String = "schema_of_avro"
+
+ @transient private lazy val avroOptionsObjectType =
ObjectType(classOf[AvroOptions])
+ @transient private lazy val parseModeObjectType =
ObjectType(classOf[ParseMode])
+ @transient private lazy val schemaObjectType = ObjectType(classOf[Schema])
+
+ override def replacement: Expression = StaticInvoke(
+ AvroExpressionEvalUtils.getClass,
+ dataType,
+ "schemaOfAvro",
+ Seq(
+ Literal(avroOptions, avroOptionsObjectType),
+ Literal(parseMode, parseModeObjectType),
+ Literal(expectedSchema, schemaObjectType)),
+ Seq(avroOptionsObjectType, parseModeObjectType, schemaObjectType)
+ )
+}
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index 096cdfe0b9ee..8c128d4c7ea6 100644
---
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -629,4 +629,40 @@ class AvroFunctionsSuite extends QueryTest with
SharedSparkSession {
assert(readbackPerson2.get(2).toString === person2.get(2))
}
}
+
+ test("schema_of_avro") {
+ val df = spark.range(1)
+ val avroIntType = s"""
+ |{
+ | "type": "int",
+ | "name": "id"
+ |}""".stripMargin
+ checkAnswer(df.select(functions.schema_of_avro(avroIntType)), Row("INT"))
+
+ val avroStructType =
+ """
+ |{
+ | "type": "record",
+ | "name": "person",
+ | "fields": [
+ | {"name": "name", "type": "string"},
+ | {"name": "age", "type": "int"},
+ | {"name": "country", "type": "string"}
+ | ]
+ |}""".stripMargin
+ checkAnswer(df.select(functions.schema_of_avro(avroStructType)),
+ Row("STRUCT<name: STRING NOT NULL, age: INT NOT NULL, country: STRING
NOT NULL>"))
+
+ val avroMultiType =
+ """
+ |{
+ | "type": "record",
+ | "name": "person",
+ | "fields": [
+ | {"name": "u", "type": ["int", "string"]}
+ | ]
+ |}""".stripMargin
+ checkAnswer(df.select(functions.schema_of_avro(avroMultiType)),
+ Row("STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>"))
+ }
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
b/sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
index fffad557aca5..e30a9e7c2ba0 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/avro/functions.scala
@@ -94,4 +94,32 @@ object functions {
def to_avro(data: Column, jsonFormatSchema: String): Column = {
Column.fn("to_avro", data, lit(jsonFormatSchema))
}
+
+ /**
+ * Returns schema in the DDL format of the avro schema in JSON string format.
+ *
+ * @param jsonFormatSchema
+ * the avro schema in JSON string format.
+ *
+ * @since 4.0.0
+ */
+ @Experimental
+ def schema_of_avro(jsonFormatSchema: String): Column = {
+ Column.fn("schema_of_avro", lit(jsonFormatSchema))
+ }
+
+ /**
+ * Returns schema in the DDL format of the avro schema in JSON string format.
+ *
+ * @param jsonFormatSchema
+ * the avro schema in JSON string format.
+ * @param options
+ * options to control how the Avro record is parsed.
+ *
+ * @since 4.0.0
+ */
+ @Experimental
+ def schema_of_avro(jsonFormatSchema: String, options: java.util.Map[String,
String]): Column = {
+ Column.fnWithOptions("schema_of_avro", options.asScala.iterator,
lit(jsonFormatSchema))
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index d9e9f49ce065..54f6820d2091 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -884,6 +884,7 @@ object FunctionRegistry {
// Avro
expression[FromAvro]("from_avro"),
expression[ToAvro]("to_avro"),
+ expression[SchemaOfAvro]("schema_of_avro"),
// Protobuf
expression[FromProtobuf]("from_protobuf"),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/avroSqlFunctions.scala
similarity index 69%
rename from
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
rename to
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/avroSqlFunctions.scala
index 457f469e0f68..6693ee83fd4a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/avroSqlFunctions.scala
@@ -200,3 +200,96 @@ case class ToAvro(child: Expression, jsonFormatSchema:
Expression)
override def prettyName: String =
getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("to_avro")
}
+
+/**
+ * Returns schema in the DDL format of the avro schema in JSON string format.
+ * This is a thin wrapper over the [[SchemaOfAvro]] class to create a SQL
function.
+ *
+ * @param jsonFormatSchema the Avro schema in JSON string format.
+ * @param options the options to use when performing the conversion.
+ *
+ * @since 4.0.0
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(jsonFormatSchema, options) - Returns schema in the DDL format of
the avro schema in JSON string format.
+ """,
+ examples = """
+ Examples:
+ > SELECT _FUNC_('{"type": "record", "name": "struct", "fields":
[{"name": "u", "type": ["int", "string"]}]}', map());
+ STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>
+ """,
+ group = "misc_funcs",
+ since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class SchemaOfAvro(jsonFormatSchema: Expression, options: Expression)
+ extends BinaryExpression with RuntimeReplaceable {
+
+ override def left: Expression = jsonFormatSchema
+ override def right: Expression = options
+
+ override protected def withNewChildrenInternal(
+ newLeft: Expression, newRight: Expression): Expression =
+ copy(jsonFormatSchema = newLeft, options = newRight)
+
+ def this(jsonFormatSchema: Expression) =
+ this(jsonFormatSchema, Literal.create(null))
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ val schemaCheck = jsonFormatSchema.dataType match {
+ case _: StringType |
+ _: NullType
+ if jsonFormatSchema.foldable =>
+ None
+ case _ =>
+ Some(TypeCheckResult.TypeCheckFailure("The first argument of the
SCHEMA_OF_AVRO SQL " +
+ "function must be a constant string containing the JSON
representation of the schema " +
+ "to use for converting the value from AVRO format"))
+ }
+ val optionsCheck = options.dataType match {
+ case MapType(StringType, StringType, _) |
+ MapType(NullType, NullType, _) |
+ _: NullType
+ if options.foldable =>
+ None
+ case _ =>
+ Some(TypeCheckResult.TypeCheckFailure("The second argument of the
SCHEMA_OF_AVRO SQL " +
+ "function must be a constant map of strings to strings containing
the options to use " +
+ "for converting the value from AVRO format"))
+ }
+ schemaCheck.getOrElse(
+ optionsCheck.getOrElse(
+ TypeCheckResult.TypeCheckSuccess))
+ }
+
+ override lazy val replacement: Expression = {
+ val schemaValue: String = jsonFormatSchema.eval() match {
+ case s: UTF8String =>
+ s.toString
+ case null =>
+ ""
+ }
+ val optionsValue: Map[String, String] = options.eval() match {
+ case a: ArrayBasedMapData if a.keyArray.array.nonEmpty =>
+ val keys: Array[String] = a.keyArray.array.map(_.toString)
+ val values: Array[String] = a.valueArray.array.map(_.toString)
+ keys.zip(values).toMap
+ case _ =>
+ Map.empty
+ }
+ val constructor = try {
+
Utils.classForName("org.apache.spark.sql.avro.SchemaOfAvro").getConstructors.head
+ } catch {
+ case _: java.lang.ClassNotFoundException =>
+ throw QueryCompilationErrors.avroNotLoadedSqlFunctionsUnusable(
+ functionName = "SCHEMA_OF_AVRO")
+ }
+ val expr = constructor.newInstance(schemaValue, optionsValue)
+ expr.asInstanceOf[Expression]
+ }
+
+ override def prettyName: String =
+ getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("schema_of_avro")
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CollationExpressionWalkerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CollationExpressionWalkerSuite.scala
index e3622c310185..d0581621148a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/CollationExpressionWalkerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/CollationExpressionWalkerSuite.scala
@@ -729,6 +729,7 @@ class CollationExpressionWalkerSuite extends SparkFunSuite
with SharedSparkSessi
// other functions which are not yet supported
"to_avro",
"from_avro",
+ "schema_of_avro",
"to_protobuf",
"from_protobuf"
)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
index 8c0231fddf39..0468ceb9f967 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
@@ -118,7 +118,8 @@ class ExpressionsSchemaSuite extends QueryTest with
SharedSparkSession {
// SET spark.sql.parser.escapedStringLiterals=true
example.split(" >
").tail.filterNot(_.trim.startsWith("SET")).take(1).foreach {
case _ if funcName == "from_avro" || funcName == "to_avro" ||
- funcName == "from_protobuf" || funcName == "to_protobuf" =>
+ funcName == "schema_of_avro" || funcName == "from_protobuf" ||
+ funcName == "to_protobuf" =>
// Skip running the example queries for the from_avro, to_avro,
from_protobuf and
// to_protobuf functions because these functions dynamically
load the
// AvroDataToCatalyst or CatalystDataToAvro classes which are
not available in this
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index a6fc43aa087d..c00f00ceaa35 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -229,6 +229,7 @@ class ExpressionInfoSuite extends SparkFunSuite with
SharedSparkSession {
// Requires dynamic class loading not available in this test suite.
"org.apache.spark.sql.catalyst.expressions.FromAvro",
"org.apache.spark.sql.catalyst.expressions.ToAvro",
+ "org.apache.spark.sql.catalyst.expressions.SchemaOfAvro",
"org.apache.spark.sql.catalyst.expressions.FromProtobuf",
"org.apache.spark.sql.catalyst.expressions.ToProtobuf",
classOf[CurrentUser].getName,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]