This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b1677a425408 [SPARK-48545][SQL] Create to_avro and from_avro SQL
functions to match DataFrame equivalents
b1677a425408 is described below
commit b1677a4254088bbe851170884d8364b730741826
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Fri Jun 21 12:12:04 2024 -0700
[SPARK-48545][SQL] Create to_avro and from_avro SQL functions to match
DataFrame equivalents
### What changes were proposed in this pull request?
This PR creates two new SQL functions "to_avro" and "from_avro" to match
existing DataFrame equivalents.
For example:
```
sql(
"""
|create table t as
| select named_struct('u', named_struct('member0', member0, 'member1',
member1)) as s
| from values (1, null), (null, 'a') tab(member0, member1)
|""".stripMargin)
val jsonFormatSchema =
"""
|{
| "type": "record",
| "name": "struct",
| "fields": [{
| "name": "u",
| "type": ["int","string"]
| }]
|}
|""".stripMargin
spark.sql(
s"""
|select from_avro(result, '$jsonFormatSchema', map()).u from (
| select to_avro(s, '$jsonFormatSchema') as result from t
|)")
.collect()
> {1, NULL}
{NULL, "a"}
```
### Why are the changes needed?
This brings parity between SQL and DataFrame APIs in Apache Spark.
### Does this PR introduce _any_ user-facing change?
Yes, see above.
### How was this patch tested?
This PR adds extra unit tests, and I also checked that the functions work
with `spark-shell`.
### Was this patch authored or co-authored using generative AI tooling?
No GitHub copilot usage this time
Closes #46977 from dtenedor/from-avro.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/sql/avro/AvroFunctionsSuite.scala | 83 +++++++++-
.../sql/catalyst/analysis/FunctionRegistry.scala | 6 +-
.../expressions/toFromAvroSqlFunctions.scala | 175 +++++++++++++++++++++
.../apache/spark/sql/ExpressionsSchemaSuite.scala | 4 +
.../sql/expressions/ExpressionInfoSuite.scala | 3 +
sql/gen-sql-functions-docs.py | 3 +-
6 files changed, 271 insertions(+), 3 deletions(-)
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index d16ddb497320..c807685db0f0 100644
---
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -26,7 +26,7 @@ import org.apache.avro.generic.{GenericDatumWriter,
GenericRecord, GenericRecord
import org.apache.avro.io.EncoderFactory
import org.apache.spark.SparkException
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.internal.SQLConf
@@ -286,4 +286,85 @@ class AvroFunctionsSuite extends QueryTest with
SharedSparkSession {
assert(msg.contains("Invalid default for field id: null not a \"long\""))
}
}
+
+ test("SPARK-48545: from_avro and to_avro SQL functions") {
+ withTable("t") {
+ sql(
+ """
+ |create table t as
+ | select named_struct('u', named_struct('member0', member0,
'member1', member1)) as s
+ | from values (1, null), (null, 'a') tab(member0, member1)
+ |""".stripMargin)
+ val jsonFormatSchema =
+ """
+ |{
+ | "type": "record",
+ | "name": "struct",
+ | "fields": [{
+ | "name": "u",
+ | "type": ["int","string"]
+ | }]
+ |}
+ |""".stripMargin
+ val toAvroSql =
+ s"""
+ |select to_avro(s, '$jsonFormatSchema') as result from t
+ |""".stripMargin
+ val avroResult = spark.sql(toAvroSql).collect()
+ assert(avroResult != null)
+ checkAnswer(
+ spark.sql(s"select from_avro(result, '$jsonFormatSchema', map()).u
from ($toAvroSql)"),
+ Seq(Row(Row(1, null)),
+ Row(Row(null, "a"))))
+
+ // Negative tests.
+ checkError(
+ exception = intercept[AnalysisException](sql(
+ s"""
+ |select to_avro(s, 42) as result from t
+ |""".stripMargin)),
+ errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+ parameters = Map("sqlExpr" -> "\"toavro(s, 42)\"",
+ "msg" -> ("The second argument of the TO_AVRO SQL function must be a
constant string " +
+ "containing the JSON representation of the schema to use for
converting the value to " +
+ "AVRO format"),
+ "hint" -> ""),
+ queryContext = Array(ExpectedContext(
+ fragment = "to_avro(s, 42)",
+ start = 8,
+ stop = 21)))
+ checkError(
+ exception = intercept[AnalysisException](sql(
+ s"""
+ |select from_avro(s, 42, '') as result from t
+ |""".stripMargin)),
+ errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+ parameters = Map("sqlExpr" -> "\"fromavro(s, 42, )\"",
+ "msg" -> ("The second argument of the FROM_AVRO SQL function must be
a constant string " +
+ "containing the JSON representation of the schema to use for
converting the value " +
+ "from AVRO format"),
+ "hint" -> ""),
+ queryContext = Array(ExpectedContext(
+ fragment = "from_avro(s, 42, '')",
+ start = 8,
+ stop = 27)))
+ checkError(
+ exception = intercept[AnalysisException](sql(
+ s"""
+ |select from_avro(s, '$jsonFormatSchema', 42) as result from t
+ |""".stripMargin)),
+ errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
+ parameters = Map(
+ "sqlExpr" ->
+ s"\"fromavro(s, $jsonFormatSchema, 42)\"".stripMargin,
+ "msg" -> ("The third argument of the FROM_AVRO SQL function must be
a constant map of " +
+ "strings to strings containing the options to use for converting
the value " +
+ "from AVRO format"),
+ "hint" -> ""),
+ queryContext = Array(ExpectedContext(
+ fragment = s"from_avro(s, '$jsonFormatSchema', 42)",
+ start = 8,
+ stop = 138)))
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 3a418497fa53..20da1c030b53 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -860,7 +860,11 @@ object FunctionRegistry {
// Xml
expression[XmlToStructs]("from_xml"),
expression[SchemaOfXml]("schema_of_xml"),
- expression[StructsToXml]("to_xml")
+ expression[StructsToXml]("to_xml"),
+
+ // Avro
+ expression[FromAvro]("from_avro"),
+ expression[ToAvro]("to_avro")
)
val builtin: SimpleFunctionRegistry = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
new file mode 100644
index 000000000000..507511a36007
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{MapType, NullType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Converts a binary column of Avro format into its corresponding Catalyst
value.
+ * This is a thin wrapper over the [[AvroDataToCatalyst]] class to create a
SQL function.
+ *
+ * @param child the Catalyst binary input column.
+ * @param jsonFormatSchema the Avro schema in JSON string format.
+ * @param options the options to use when performing the conversion.
+ *
+ * @since 4.0.0
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(child, jsonFormatSchema, options) - Converts a binary Avro value
into a Catalyst value.
+ """,
+ examples = """
+ Examples:
+ > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{
"name": "u", "type": ["int","string"] }]}', map()) IS NULL AS result FROM
(SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1))
AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1));
+ [false]
+ """,
+ note = """
+ The specified schema must match actual schema of the read data, otherwise
the behavior
+ is undefined: it may fail or return arbitrary result.
+ To deserialize the data with a compatible and evolved schema, the expected
Avro schema can be
+ set via the corresponding option.
+ """,
+ group = "misc_funcs",
+ since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class FromAvro(child: Expression, jsonFormatSchema: Expression, options:
Expression)
+ extends TernaryExpression with RuntimeReplaceable {
+ override def first: Expression = child
+ override def second: Expression = jsonFormatSchema
+ override def third: Expression = options
+
+ override def withNewChildrenInternal(
+ newFirst: Expression, newSecond: Expression, newThird: Expression):
Expression = {
+ copy(child = newFirst, jsonFormatSchema = newSecond, options = newThird)
+ }
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ val schemaCheck = jsonFormatSchema.dataType match {
+ case _: StringType |
+ _: NullType
+ if jsonFormatSchema.foldable =>
+ None
+ case _ =>
+ Some(TypeCheckResult.TypeCheckFailure(
+ "The second argument of the FROM_AVRO SQL function must be a
constant string " +
+ "containing the JSON representation of the schema to use for
converting the value " +
+ "from AVRO format"))
+ }
+ val optionsCheck = options.dataType match {
+ case MapType(StringType, StringType, _) |
+ MapType(NullType, NullType, _) |
+ _: NullType
+ if options.foldable =>
+ None
+ case _ =>
+ Some(TypeCheckResult.TypeCheckFailure(
+ "The third argument of the FROM_AVRO SQL function must be a constant
map of strings to " +
+ "strings containing the options to use for converting the value
from AVRO format"))
+ }
+ schemaCheck.getOrElse(
+ optionsCheck.getOrElse(
+ TypeCheckResult.TypeCheckSuccess))
+ }
+
+ override def replacement: Expression = {
+ val schemaValue: String = jsonFormatSchema.eval() match {
+ case s: UTF8String =>
+ s.toString
+ case null =>
+ ""
+ }
+ val optionsValue: Map[String, String] = options.eval() match {
+ case a: ArrayBasedMapData if a.keyArray.array.nonEmpty =>
+ val keys: Array[String] = a.keyArray.array.map(_.toString)
+ val values: Array[String] = a.valueArray.array.map(_.toString)
+ keys.zip(values).toMap
+ case _ =>
+ Map.empty
+ }
+ val constructor =
+
Utils.classForName("org.apache.spark.sql.avro.AvroDataToCatalyst").getConstructors().head
+ val expr = constructor.newInstance(child, schemaValue, optionsValue)
+ expr.asInstanceOf[Expression]
+ }
+}
+
+/**
+ * Converts a Catalyst binary input value into its corresponding AvroAvro
format result.
+ * This is a thin wrapper over the [[CatalystDataToAvro]] class to create a
SQL function.
+ *
+ * @param child the Catalyst binary input column.
+ * @param jsonFormatSchema the Avro schema in JSON string format.
+ *
+ * @since 4.0.0
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = """
+ _FUNC_(child, jsonFormatSchema) - Converts a Catalyst binary input value
into its corresponding
+ Avro format result.
+ """,
+ examples = """
+ Examples:
+ > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{
"name": "u", "type": ["int","string"] }]}', MAP()) IS NULL FROM (SELECT NULL AS
s);
+ [true]
+ """,
+ group = "misc_funcs",
+ since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class ToAvro(child: Expression, jsonFormatSchema: Expression)
+ extends BinaryExpression with RuntimeReplaceable {
+ override def left: Expression = child
+
+ override def right: Expression = jsonFormatSchema
+
+ override def withNewChildrenInternal(newLeft: Expression, newRight:
Expression): Expression = {
+ copy(child = newLeft, jsonFormatSchema = newRight)
+ }
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ jsonFormatSchema.dataType match {
+ case _: StringType if jsonFormatSchema.foldable =>
+ TypeCheckResult.TypeCheckSuccess
+ case _ =>
+ TypeCheckResult.TypeCheckFailure(
+ "The second argument of the TO_AVRO SQL function must be a constant
string " +
+ "containing the JSON representation of the schema to use for
converting the value " +
+ "to AVRO format")
+ }
+ }
+
+ override def replacement: Expression = {
+ val schemaValue: Option[String] = jsonFormatSchema.eval() match {
+ case null =>
+ None
+ case s: UTF8String =>
+ Some(s.toString)
+ }
+ val constructor =
+
Utils.classForName("org.apache.spark.sql.avro.CatalystDataToAvro").getConstructors().head
+ val expr = constructor.newInstance(child, schemaValue)
+ expr.asInstanceOf[Expression]
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
index 73b2eba7060d..443597f10056 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala
@@ -117,6 +117,10 @@ class ExpressionsSchemaSuite extends QueryTest with
SharedSparkSession {
// Note: We need to filter out the commands that set the parameters,
such as:
// SET spark.sql.parser.escapedStringLiterals=true
example.split(" >
").tail.filterNot(_.trim.startsWith("SET")).take(1).foreach {
+ case _ if funcName == "from_avro" || funcName == "to_avro" =>
+ // Skip running the example queries for the from_avro and to_avro
functions because
+ // these functions dynamically load the AvroDataToCatalyst or
CatalystDataToAvro classes
+ // which are not available in this test.
case exampleRe(sql, _) =>
val df = spark.sql(sql)
val escapedSql = sql.replaceAll("\\|", "|")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index e80f4af1dc46..bf5d1b24af21 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -225,6 +225,9 @@ class ExpressionInfoSuite extends SparkFunSuite with
SharedSparkSession {
// Throws an error
"org.apache.spark.sql.catalyst.expressions.RaiseErrorExpressionBuilder",
"org.apache.spark.sql.catalyst.expressions.AssertTrue",
+ // Requires dynamic class loading not available in this test suite.
+ "org.apache.spark.sql.catalyst.expressions.FromAvro",
+ "org.apache.spark.sql.catalyst.expressions.ToAvro",
classOf[CurrentUser].getName,
// The encrypt expression includes a random initialization vector to its
encrypted result
classOf[AesEncrypt].getName)
diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py
index 053e11d10295..dc48a5a6155e 100644
--- a/sql/gen-sql-functions-docs.py
+++ b/sql/gen-sql-functions-docs.py
@@ -163,7 +163,8 @@ def _make_pretty_examples(jspark, infos):
pretty_output = ""
for info in infos:
- if info.examples.startswith("\n Examples:"):
+ if (info.examples.startswith("\n Examples:")
+ and info.name.lower() not in ("from_avro", "to_avro")):
output = []
output.append("-- %s" % info.name)
query_examples = filter(lambda x: x.startswith(" > "),
info.examples.split("\n"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]