This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fc5868791a5 [SPARK-46355][SQL] XML: Close InputStreamReader on read 
completion
fc5868791a5 is described below

commit fc5868791a5d261fb416b376cc6eb39e8e29b90a
Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com>
AuthorDate: Mon Dec 11 09:41:03 2023 -0800

    [SPARK-46355][SQL] XML: Close InputStreamReader on read completion
    
    ### What changes were proposed in this pull request?
    XML InputStreamReader need to be closed on read completion to timely 
release InputStream resources.
    
    ### Why are the changes needed?
    Not closing the reader may result in not timely recycling underlying 
InputStream connection resources for cloud objects.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test and manual testing
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44287 from sandip-db/xml-reader-close.
    
    Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/catalyst/xml/StaxXmlParser.scala      | 21 +++++++++++++++------
 .../spark/sql/catalyst/xml/XmlInferSchema.scala     |  4 +++-
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index 567074bbf12..373ef410f05 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -133,7 +133,9 @@ class StaxXmlParser(
       val isRootAttributesOnly = schema.fields.forall { f =>
         f.name == options.valueTag || 
f.name.startsWith(options.attributePrefix)
       }
-      Some(convertObject(parser, schema, rootAttributes, isRootAttributesOnly))
+      val result = Some(convertObject(parser, schema, rootAttributes, 
isRootAttributesOnly))
+      parser.close()
+      result
     } catch {
       case e: SparkUpgradeException => throw e
       case e@(_: RuntimeException | _: XMLStreamException | _: 
MalformedInputException
@@ -576,7 +578,7 @@ class StaxXmlParser(
 class XmlTokenizer(
   inputStream: InputStream,
   options: XmlOptions) {
-  private val reader = new InputStreamReader(inputStream, 
Charset.forName(options.charset))
+  private var reader = new InputStreamReader(inputStream, 
Charset.forName(options.charset))
   private var currentStartTag: String = _
   private var buffer = new StringBuilder()
   private val startTag = s"<${options.rowTag}>"
@@ -591,17 +593,24 @@ class XmlTokenizer(
    * @return whether it reads successfully
    */
   def next(): Option[String] = {
-    if (readUntilStartElement()) {
-      try {
+    try {
+      if (readUntilStartElement()) {
         buffer.append(currentStartTag)
         // Don't check whether the end element was found. Even if not, return 
everything
         // that was read, which will invariably cause a parse error later
         readUntilEndElement(currentStartTag.endsWith(">"))
-        return Some(buffer.toString())
-      } finally {
+        val str = buffer.toString()
         buffer = new StringBuilder()
+        return Some(str)
       }
+    } catch {
+      case e: Throwable =>
+        reader.close()
+        reader = null
+        throw e
     }
+    reader.close()
+    reader = null
     None
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
index f81c476cd38..52c98bc43eb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
@@ -118,7 +118,9 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
       }
       val parser = StaxXmlParserUtils.filteredReader(xml)
       val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
-      Some(inferObject(parser, rootAttributes))
+      val schema = Some(inferObject(parser, rootAttributes))
+      parser.close()
+      schema
     } catch {
       case NonFatal(_) if options.parseMode == PermissiveMode =>
         Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, 
StringType))))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to