This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 69239729f35e [SPARK-46954][SQL] XML: Optimize schema index lookup
69239729f35e is described below
commit 69239729f35e8fb88e64747bc3ab2cf66938e76b
Author: Sandip Agarwala <[email protected]>
AuthorDate: Mon Feb 5 14:47:39 2024 +0900
[SPARK-46954][SQL] XML: Optimize schema index lookup
### What changes were proposed in this pull request?
Optimize schema field index lookup in the XML parser
### Why are the changes needed?
10%-15% Perf improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44997 from sandip-db/xml_schema_index.
Lead-authored-by: Sandip Agarwala
<[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/types/StructType.scala | 8 +++++++-
.../spark/sql/catalyst/xml/StaxXmlParser.scala | 20 +++++++++-----------
2 files changed, 16 insertions(+), 12 deletions(-)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/StructType.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 04ae31429118..aac2f1cd737e 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.analysis.SqlApiAnalysis
import org.apache.spark.sql.catalyst.parser.{DataTypeParser,
LegacyTypeStringParser}
import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.sql.catalyst.util.{SparkStringUtils, StringConcat}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
SparkStringUtils, StringConcat}
import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.internal.SqlApiConf
import org.apache.spark.util.SparkCollectionUtils
@@ -118,6 +118,8 @@ case class StructType(fields: Array[StructField]) extends
DataType with Seq[Stru
private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
private lazy val nameToField: Map[String, StructField] = fields.map(f =>
f.name -> f).toMap
private lazy val nameToIndex: Map[String, Int] =
SparkCollectionUtils.toMapWithIndex(fieldNames)
+ private lazy val nameToIndexCaseInsensitive: CaseInsensitiveMap[Int] =
+ CaseInsensitiveMap[Int](nameToIndex.toMap)
override def equals(that: Any): Boolean = {
that match {
@@ -315,6 +317,10 @@ case class StructType(fields: Array[StructField]) extends
DataType with Seq[Stru
nameToIndex.get(name)
}
+ private[sql] def getFieldIndexCaseInsensitive(name: String): Option[Int] = {
+ nameToIndexCaseInsensitive.get(name)
+ }
+
/**
* Returns the normalized path to a field and the field in this struct and
its child structs.
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index 674d5f63b039..74413bb8cbb2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{
ArrayBasedMapData,
BadRecordException,
- CaseInsensitiveMap,
DateFormatter,
DropMalformedMode,
FailureSafeParser,
@@ -101,11 +100,11 @@ class StaxXmlParser(
}
}
- private def getFieldNameToIndex(schema: StructType): Map[String, Int] = {
+ private def getFieldIndex(schema: StructType, fieldName: String):
Option[Int] = {
if (caseSensitive) {
- schema.map(_.name).zipWithIndex.toMap
+ schema.getFieldIndex(fieldName)
} else {
- CaseInsensitiveMap(schema.map(_.name).zipWithIndex.toMap)
+ schema.getFieldIndexCaseInsensitive(fieldName)
}
}
@@ -292,8 +291,8 @@ class StaxXmlParser(
val convertedValuesMap = collection.mutable.Map.empty[String, Any]
val valuesMap =
StaxXmlParserUtils.convertAttributesToValuesMap(attributes, options)
valuesMap.foreach { case (f, v) =>
- val nameToIndex = getFieldNameToIndex(schema)
- nameToIndex.get(f).foreach { i =>
+ val indexOpt = getFieldIndex(schema, f)
+ indexOpt.foreach { i =>
convertedValuesMap(f) = convertTo(v, schema(i).dataType)
}
}
@@ -332,8 +331,8 @@ class StaxXmlParser(
// Here we merge both to a row.
val valuesMap = fieldsMap ++ attributesMap
valuesMap.foreach { case (f, v) =>
- val nameToIndex = getFieldNameToIndex(schema)
- nameToIndex.get(f).foreach { row(_) = v }
+ val indexOpt = getFieldIndex(schema, f)
+ indexOpt.foreach { row(_) = v }
}
if (valuesMap.isEmpty) {
@@ -353,11 +352,10 @@ class StaxXmlParser(
schema: StructType,
rootAttributes: Array[Attribute] = Array.empty): InternalRow = {
val row = new Array[Any](schema.length)
- val nameToIndex = getFieldNameToIndex(schema)
// If there are attributes, then we process them first.
convertAttributes(rootAttributes, schema).toSeq.foreach {
case (f, v) =>
- nameToIndex.get(f).foreach { row(_) = v }
+ getFieldIndex(schema, f).foreach { row(_) = v }
}
val wildcardColName = options.wildcardColName
@@ -372,7 +370,7 @@ class StaxXmlParser(
val attributes = e.getAttributes.asScala.toArray
val field = StaxXmlParserUtils.getName(e.asStartElement.getName,
options)
- nameToIndex.get(field) match {
+ getFieldIndex(schema, field) match {
case Some(index) => schema(index).dataType match {
case st: StructType =>
row(index) = convertObjectWithAttributes(parser, st, field,
attributes)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]