This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fc5868791a5 [SPARK-46355][SQL] XML: Close InputStreamReader on read
completion
fc5868791a5 is described below
commit fc5868791a5d261fb416b376cc6eb39e8e29b90a
Author: Sandip Agarwala <[email protected]>
AuthorDate: Mon Dec 11 09:41:03 2023 -0800
[SPARK-46355][SQL] XML: Close InputStreamReader on read completion
### What changes were proposed in this pull request?
XML InputStreamReader need to be closed on read completion to timely
release InputStream resources.
### Why are the changes needed?
Not closing the reader may result in not timely recycling underlying
InputStream connection resources for cloud objects.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test and manual testing
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44287 from sandip-db/xml-reader-close.
Authored-by: Sandip Agarwala <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/catalyst/xml/StaxXmlParser.scala | 21 +++++++++++++++------
.../spark/sql/catalyst/xml/XmlInferSchema.scala | 4 +++-
2 files changed, 18 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index 567074bbf12..373ef410f05 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -133,7 +133,9 @@ class StaxXmlParser(
val isRootAttributesOnly = schema.fields.forall { f =>
f.name == options.valueTag ||
f.name.startsWith(options.attributePrefix)
}
- Some(convertObject(parser, schema, rootAttributes, isRootAttributesOnly))
+ val result = Some(convertObject(parser, schema, rootAttributes,
isRootAttributesOnly))
+ parser.close()
+ result
} catch {
case e: SparkUpgradeException => throw e
case e@(_: RuntimeException | _: XMLStreamException | _:
MalformedInputException
@@ -576,7 +578,7 @@ class StaxXmlParser(
class XmlTokenizer(
inputStream: InputStream,
options: XmlOptions) {
- private val reader = new InputStreamReader(inputStream,
Charset.forName(options.charset))
+ private var reader = new InputStreamReader(inputStream,
Charset.forName(options.charset))
private var currentStartTag: String = _
private var buffer = new StringBuilder()
private val startTag = s"<${options.rowTag}>"
@@ -591,17 +593,24 @@ class XmlTokenizer(
* @return whether it reads successfully
*/
def next(): Option[String] = {
- if (readUntilStartElement()) {
- try {
+ try {
+ if (readUntilStartElement()) {
buffer.append(currentStartTag)
// Don't check whether the end element was found. Even if not, return
everything
// that was read, which will invariably cause a parse error later
readUntilEndElement(currentStartTag.endsWith(">"))
- return Some(buffer.toString())
- } finally {
+ val str = buffer.toString()
buffer = new StringBuilder()
+ return Some(str)
}
+ } catch {
+ case e: Throwable =>
+ reader.close()
+ reader = null
+ throw e
}
+ reader.close()
+ reader = null
None
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
index f81c476cd38..52c98bc43eb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
@@ -118,7 +118,9 @@ class XmlInferSchema(options: XmlOptions, caseSensitive:
Boolean)
}
val parser = StaxXmlParserUtils.filteredReader(xml)
val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
- Some(inferObject(parser, rootAttributes))
+ val schema = Some(inferObject(parser, rootAttributes))
+ parser.close()
+ schema
} catch {
case NonFatal(_) if options.parseMode == PermissiveMode =>
Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord,
StringType))))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]