This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e69752aa66c0 [SPARK-45488][SQL] XML: Add support for value in 'rowTag'
element
e69752aa66c0 is described below
commit e69752aa66c09df843adaebe86a63cf799961292
Author: Shujing Yang <[email protected]>
AuthorDate: Thu Oct 12 16:17:37 2023 +0900
[SPARK-45488][SQL] XML: Add support for value in 'rowTag' element
### What changes were proposed in this pull request?
The following XML with rowTag 'book' will yield a schema with just "_id"
column and not the value:
```
<p><book id="1">Great Book</book> </p>
```
Let's parse value as well. The scope of this PR is to keep the rowTag's
behavior of `valueTag` consistent with the inner objects.
### Why are the changes needed?
The semantics for attributes and `valueTag` should be consistent
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43319 from shujingyang-db/rootlevel-valuetag.
Lead-authored-by: Shujing Yang <[email protected]>
Co-authored-by: Shujing Yang
<[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/catalyst/xml/StaxXmlParser.scala | 17 ++++-
.../spark/sql/catalyst/xml/XmlInferSchema.scala | 16 +++++
.../xml-resources/root-level-value-none.xml | 8 +++
.../test-data/xml-resources/root-level-value.xml | 9 +++
.../sql/execution/datasources/xml/XmlSuite.scala | 75 ++++++++++++++++++++++
5 files changed, 123 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index ac29e234e5f9..dcb760aca9d2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -103,7 +103,12 @@ class StaxXmlParser(
}
val parser = StaxXmlParserUtils.filteredReader(xml)
val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
- Some(convertObject(parser, schema, options, rootAttributes))
+ // A structure object is an attribute-only element
+ // if it only consists of attributes and valueTags.
+ val isRootAttributesOnly = schema.fields.forall { f =>
+ f.name == options.valueTag ||
f.name.startsWith(options.attributePrefix)
+ }
+ Some(convertObject(parser, schema, options, rootAttributes,
isRootAttributesOnly))
} catch {
case e: SparkUpgradeException => throw e
case e@(_: RuntimeException | _: XMLStreamException | _:
MalformedInputException
@@ -305,7 +310,8 @@ class StaxXmlParser(
parser: XMLEventReader,
schema: StructType,
options: XmlOptions,
- rootAttributes: Array[Attribute] = Array.empty): InternalRow = {
+ rootAttributes: Array[Attribute] = Array.empty,
+ isRootAttributesOnly: Boolean = false): InternalRow = {
val row = new Array[Any](schema.length)
val nameToIndex = schema.map(_.name).zipWithIndex.toMap
// If there are attributes, then we process them first.
@@ -371,6 +377,13 @@ class StaxXmlParser(
badRecordException = badRecordException.orElse(Some(e))
}
+ case c: Characters if !c.isWhiteSpace && isRootAttributesOnly =>
+ nameToIndex.get(options.valueTag) match {
+ case Some(index) =>
+ row(index) = convertTo(c.getData, schema(index).dataType,
options)
+ case None => // do nothing
+ }
+
case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
index 3eabf4525b4e..8bddb8f5bd99 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
@@ -219,12 +219,28 @@ private[sql] object XmlInferSchema {
dataTypes += inferredType
nameToDataType += (field -> dataTypes)
+ case c: Characters if !c.isWhiteSpace =>
+ // This can be an attribute-only object
+ val valueTagType = inferFrom(c.getData, options)
+ nameToDataType += options.valueTag -> ArrayBuffer(valueTagType)
+
case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
case _ => // do nothing
}
}
+ // A structure object is an attribute-only element
+ // if it only consists of attributes and valueTags.
+ // If not, we will remove the valueTag field from the schema
+ val attributesOnly = nameToDataType.forall {
+ case (fieldName, dataTypes) =>
+ dataTypes.length == 1 &&
+ (fieldName == options.valueTag ||
fieldName.startsWith(options.attributePrefix))
+ }
+ if (!attributesOnly) {
+ nameToDataType -= options.valueTag
+ }
// We need to manually merges the fields having the sames so that
// This can be inferred as ArrayType.
nameToDataType.foreach {
diff --git
a/sql/core/src/test/resources/test-data/xml-resources/root-level-value-none.xml
b/sql/core/src/test/resources/test-data/xml-resources/root-level-value-none.xml
new file mode 100644
index 000000000000..4b1a88baa4fd
--- /dev/null
+++
b/sql/core/src/test/resources/test-data/xml-resources/root-level-value-none.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0"?>
+<ROWSET>
+ <ROW>value1</ROW>
+ <ROW attr="attr1">value2</ROW>
+ <ROW>4<tag>5</tag></ROW>
+ <ROW><tag>6</tag>7</ROW>
+ <ROW attr="8"></ROW>
+</ROWSET>
diff --git
a/sql/core/src/test/resources/test-data/xml-resources/root-level-value.xml
b/sql/core/src/test/resources/test-data/xml-resources/root-level-value.xml
new file mode 100644
index 000000000000..bd3c438bdf96
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/xml-resources/root-level-value.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0"?>
+<ROWSET>
+ <ROW>value1</ROW>
+ <ROW attr="attr1">value2</ROW>
+ <ROW>
+ value3
+ <!-- this is a comment-->
+ </ROW>
+</ROWSET>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
index aed0939f04d3..c5892abf3f8b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
@@ -1645,4 +1645,79 @@ class XmlSuite extends QueryTest with SharedSparkSession
{
.xml(getTestResourcePath(resDir + "fias_house.xml"))
assert(df.collect().length === 37)
}
+
+ test("SPARK-45488: root-level value tag for attributes-only object") {
+ val schema = buildSchema(field("_attr"), field("_VALUE"))
+ val results = Seq(
+ // user specified schema
+ spark.read
+ .schema(schema)
+ .xml(getTestResourcePath(resDir + "root-level-value.xml")).collect(),
+ // schema inference
+ spark.read
+ .xml(getTestResourcePath(resDir + "root-level-value.xml")).collect())
+ results.foreach { result =>
+ assert(result.length === 3)
+ assert(result(0).getAs[String]("_VALUE") == "value1")
+ assert(result(1).getAs[String]("_attr") == "attr1"
+ && result(1).getAs[String]("_VALUE") == "value2")
+ // comments aren't included in valueTag
+ assert(result(2).getAs[String]("_VALUE") == "\n value3\n ")
+ }
+ }
+
+ test("SPARK-45488: root-level value tag for not attributes-only object") {
+ val ATTRIBUTE_NAME = "_attr"
+ val TAG_NAME = "tag"
+ val VALUETAG_NAME = "_VALUE"
+ val schema = buildSchema(
+ field(ATTRIBUTE_NAME),
+ field(TAG_NAME, LongType),
+ field(VALUETAG_NAME))
+ val dfs = Seq(
+ // user specified schema
+ spark.read
+ .schema(schema)
+ .xml(getTestResourcePath(resDir + "root-level-value-none.xml")),
+ // schema inference
+ spark.read
+ .xml(getTestResourcePath(resDir + "root-level-value-none.xml"))
+ )
+ dfs.foreach { df =>
+ val result = df.collect()
+ assert(result.length === 5)
+ assert(result(0).get(0) == null && result(0).get(1) == null)
+ assert(
+ result(1).getAs[String](ATTRIBUTE_NAME) == "attr1"
+ && result(1).getAs[Any](TAG_NAME) == null
+ )
+ assert(
+ result(2).getAs[Long](TAG_NAME) == 5L
+ && result(2).getAs[Any](ATTRIBUTE_NAME) == null
+ )
+ assert(
+ result(3).getAs[Long](TAG_NAME) == 6L
+ && result(3).getAs[Any](ATTRIBUTE_NAME) == null
+ )
+ assert(
+ result(4).getAs[String](ATTRIBUTE_NAME) == "8"
+ && result(4).getAs[Any](TAG_NAME) == null
+ )
+ }
+ }
+
+ test("SPARK-45488: root-level value tag for attributes-only object - from
xml") {
+ val xmlData = """<ROW attr="attr1">123456</ROW>"""
+ val df = Seq((1, xmlData)).toDF("number", "payload")
+ val xmlSchema = schema_of_xml(xmlData)
+ val schema = buildSchema(
+ field("_VALUE", LongType),
+ field("_attr"))
+ val expectedSchema = df.schema.add("decoded", schema)
+ val result = df.withColumn("decoded",
+ from_xml(df.col("payload"), xmlSchema, Map[String, String]().asJava))
+ assert(expectedSchema == result.schema)
+ assert(result.select("decoded._VALUE").head().getLong(0) === 123456L)
+ assert(result.select("decoded._attr").head().getString(0) === "attr1")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]