This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fc5868791a5 [SPARK-46355][SQL] XML: Close InputStreamReader on read completion fc5868791a5 is described below commit fc5868791a5d261fb416b376cc6eb39e8e29b90a Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> AuthorDate: Mon Dec 11 09:41:03 2023 -0800 [SPARK-46355][SQL] XML: Close InputStreamReader on read completion ### What changes were proposed in this pull request? XML InputStreamReader need to be closed on read completion to timely release InputStream resources. ### Why are the changes needed? Not closing the reader may result in not timely recycling underlying InputStream connection resources for cloud objects. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test and manual testing ### Was this patch authored or co-authored using generative AI tooling? No Closes #44287 from sandip-db/xml-reader-close. Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/catalyst/xml/StaxXmlParser.scala | 21 +++++++++++++++------ .../spark/sql/catalyst/xml/XmlInferSchema.scala | 4 +++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala index 567074bbf12..373ef410f05 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala @@ -133,7 +133,9 @@ class StaxXmlParser( val isRootAttributesOnly = schema.fields.forall { f => f.name == options.valueTag || f.name.startsWith(options.attributePrefix) } - Some(convertObject(parser, schema, rootAttributes, isRootAttributesOnly)) + val result = Some(convertObject(parser, schema, rootAttributes, isRootAttributesOnly)) + parser.close() + result } catch { case e: SparkUpgradeException => throw e case e@(_: RuntimeException | _: XMLStreamException | _: MalformedInputException @@ -576,7 +578,7 @@ class StaxXmlParser( class XmlTokenizer( inputStream: InputStream, options: XmlOptions) { - private val reader = new InputStreamReader(inputStream, Charset.forName(options.charset)) + private var reader = new InputStreamReader(inputStream, Charset.forName(options.charset)) private var currentStartTag: String = _ private var buffer = new StringBuilder() private val startTag = s"<${options.rowTag}>" @@ -591,17 +593,24 @@ class XmlTokenizer( * @return whether it reads successfully */ def next(): Option[String] = { - if (readUntilStartElement()) { - try { + try { + if (readUntilStartElement()) { buffer.append(currentStartTag) // Don't check whether the end element was found. Even if not, return everything // that was read, which will invariably cause a parse error later readUntilEndElement(currentStartTag.endsWith(">")) - return Some(buffer.toString()) - } finally { + val str = buffer.toString() buffer = new StringBuilder() + return Some(str) } + } catch { + case e: Throwable => + reader.close() + reader = null + throw e } + reader.close() + reader = null None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala index f81c476cd38..52c98bc43eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala @@ -118,7 +118,9 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean) } val parser = StaxXmlParserUtils.filteredReader(xml) val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser) - Some(inferObject(parser, rootAttributes)) + val schema = Some(inferObject(parser, rootAttributes)) + parser.close() + schema } catch { case NonFatal(_) if options.parseMode == PermissiveMode => Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, StringType)))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org