This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e56956860 fix: close CloseableIterable owners in Iceberg read paths 
(#5149)
5e56956860 is described below

commit 5e569568606a204070040b1521210cc9d853bc10
Author: Meng Wang <[email protected]>
AuthorDate: Sat May 23 00:04:20 2026 -0700

    fix: close CloseableIterable owners in Iceberg read paths (#5149)
    
    ### What changes were proposed in this PR?
    
    Fixes a resource leak in `IcebergUtil.readDataFileAsIterator` and five
    sibling sites in `IcebergDocument` that share the same anti-pattern:
    
    ```scala
    closeableIterable.iterator().asScala
    ```
    
    The bare `Iterator` returned to callers held no reference to its parent
    `CloseableIterable`, so the parent could never be closed. Under
    `S3FileIO`:
    
    1. Every read leaked one `S3InputStream` (kept open until GC because
    nothing in the call graph could close it).
    2. The leaked stream had already borrowed one slot from the AWS SDK's
    `ApacheHttpClient` connection pool (default **50**; texera did not
    override).
    3. After ~50 leaked reads the pool may saturate; new S3 reads then block
    on `acquireConnection` until JVM restart.
    
    This PR:
    
    - Introduces `CloseableScalaIterator[T]` (`Iterator[T] with
    AutoCloseable`, idempotent `close()`) in `IcebergUtil`, which wraps a
    `CloseableIterable[T]` and propagates `close()` to the parent.
    - Changes `IcebergUtil.readDataFileAsIterator` to return
    `CloseableScalaIterator[Record]` instead of bare `Iterator[Record]`.
    Callers must now close it (e.g. via `Using.resource`).
    - Updates the single caller in `IcebergDocument`'s read iterator to
    track the close handle in a sibling `AutoCloseable` field
    (`currentRecordIteratorCloser`) and close it on file-switch, on
    exhaustion, and on caller-imposed `until` cap. The sibling field is
    necessary because `Iterator.drop(n)` returns a bare iterator that loses
    the wrapper type.
    - Wraps the four eagerly-consumed `planFiles()` call sites — `getCount`,
    `seekToUsableFile`, `getTableStatistics`, `asInputStream` — in
    `Using.resource` so the metadata-side `CloseableIterable<FileScanTask>`
    is closed promptly.
    
    **Known limitation (out of scope here):** if a caller of
    `IcebergDocument.get()` / `getRange()` / `getAfter()` stops iterating
    before `hasNext` returns `false` (e.g. throws mid-loop, or calls
    `.take(n)` and then drops the result), the LAST file's
    `CloseableScalaIterator` will leak until JVM GC. Fixing this requires
    changing the public `Iterator[T]` return type on `VirtualDocument` to
    `Iterator[T] with AutoCloseable` and updating all callers — best done as
    a separate refactor.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5143.
    
    ### How was this PR tested?
    
    - Added `IcebergUtilLeakSpec` (2 cases): validates that
    `CloseableScalaIterator` (a) closes its parent `CloseableIterable` when
    used inside `Using.resource`, and (b) is idempotent under repeated
    `close()` calls.
    - All existing iceberg specs still pass:
      - `IcebergUtilSpec`: 14/14
      - `IcebergUtilLeakSpec`: 2/2 (new)
    - `IcebergDocumentSpec`: 18/18 (exercises the modified read iterator's
    close-on-reassign / close-on-exhaustion paths against real Iceberg
    infrastructure)
    - `IcebergTableStatsSpec`: 12/12 (exercises `getTableStatistics` with
    the new `Using.resource` wrap)
      - `IcebergDocumentConsoleMessagesSpec`: 1/1
    
    Run locally:
    
    ```
    sbt "WorkflowCore/testOnly org.apache.texera.amber.util.IcebergUtilSpec 
org.apache.texera.amber.util.IcebergUtilLeakSpec 
org.apache.texera.amber.storage.result.iceberg.*"
    ```
    
    Result: `47 succeeded, 0 failed`.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../storage/result/iceberg/IcebergDocument.scala   | 245 ++++++++++++---------
 .../org/apache/texera/amber/util/IcebergUtil.scala |  27 ++-
 .../result/iceberg/IcebergDocumentSpec.scala       |  21 ++
 3 files changed, 175 insertions(+), 118 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
index e10152cdae..182da2baac 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
@@ -125,7 +125,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
       .getOrElse(
         return 0
       )
-    table.newScan().planFiles().iterator().asScala.map(f => 
f.file().recordCount()).sum
+    Using.resource(table.newScan().planFiles()) { tasks =>
+      tasks.iterator().asScala.map(f => f.file().recordCount()).sum
+    }
   }
 
   /**
@@ -178,8 +180,11 @@ private[storage] class IcebergDocument[T >: Null <: 
AnyRef](
         // Iterator for usable file scan tasks
         private var usableFileIterator: Iterator[FileScanTask] = 
seekToUsableFile()
 
-        // Current record iterator for the active file
+        // Active file's records. Closer tracked separately because the
+        // `.drop` call below returns a bare `Iterator[Record]` that loses
+        // the wrapper type.
         private var currentRecordIterator: Iterator[Record] = Iterator.empty
+        private var currentRecordIteratorCloser: AutoCloseable = () => ()
 
         // Util function to load the table's metadata
         private def loadTableMetadata(): Option[Table] = {
@@ -202,38 +207,43 @@ private[storage] class IcebergDocument[T >: Null <: 
AnyRef](
             }
             table.foreach(_.refresh())
 
-            // Retrieve and sort the file scan tasks by file sequence number
+            // Retrieve and sort the file scan tasks by file sequence number.
+            // Materialize inside `Using.resource` so the `planFiles()`
+            // CloseableIterable is released after collection.
             val fileScanTasksIterator: Iterator[FileScanTask] = table match {
               case Some(t) =>
                 val currentSnapshotId = 
Option(t.currentSnapshot()).map(_.snapshotId())
-                val fileScanTasks = (lastSnapshotId, currentSnapshotId) match {
-                  // Read from the start
-                  case (None, Some(_)) =>
-                    val tasks = t.newScan().planFiles().iterator().asScala
-                    lastSnapshotId = currentSnapshotId
-                    tasks
-
-                  // Read incrementally from the last snapshot
-                  case (Some(lastId), Some(currId)) if lastId != currId =>
-                    val tasks = t
-                      .newIncrementalAppendScan()
-                      .fromSnapshotExclusive(lastId)
-                      .toSnapshot(currId)
-                      .planFiles()
-                      .iterator()
-                      .asScala
-                    lastSnapshotId = currentSnapshotId
-                    tasks
-
-                  // No new data
-                  case (Some(lastId), Some(currId)) if lastId == currId =>
-                    Iterator.empty
-
-                  // Default: No data yet
-                  case _ =>
-                    Iterator.empty
-                }
-                
fileScanTasks.toSeq.sortBy(_.file().fileSequenceNumber()).iterator
+                val fileScanTasks: Seq[FileScanTask] =
+                  (lastSnapshotId, currentSnapshotId) match {
+                    // Read from the start
+                    case (None, Some(_)) =>
+                      val tasks = Using.resource(t.newScan().planFiles()) { ci 
=>
+                        ci.iterator().asScala.toSeq
+                      }
+                      lastSnapshotId = currentSnapshotId
+                      tasks
+
+                    // Read incrementally from the last snapshot
+                    case (Some(lastId), Some(currId)) if lastId != currId =>
+                      val tasks = Using.resource(
+                        t
+                          .newIncrementalAppendScan()
+                          .fromSnapshotExclusive(lastId)
+                          .toSnapshot(currId)
+                          .planFiles()
+                      ) { ci => ci.iterator().asScala.toSeq }
+                      lastSnapshotId = currentSnapshotId
+                      tasks
+
+                    // No new data
+                    case (Some(lastId), Some(currId)) if lastId == currId =>
+                      Seq.empty
+
+                    // Default: No data yet
+                    case _ =>
+                      Seq.empty
+                  }
+                fileScanTasks.sortBy(_.file().fileSequenceNumber()).iterator
 
               case None =>
                 Iterator.empty
@@ -255,6 +265,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
 
         override def hasNext: Boolean = {
           if (numOfReturnedRecords >= totalRecordsToReturn) {
+            // Caller-imposed limit reached; release the active file's reader.
+            currentRecordIteratorCloser.close()
+            currentRecordIteratorCloser = () => ()
             return false
           }
 
@@ -274,11 +287,15 @@ private[storage] class IcebergDocument[T >: Null <: 
AnyRef](
               case Some(cols) => tableSchema.select(cols.asJava)
               case None       => tableSchema
             }
-            currentRecordIterator = IcebergUtil.readDataFileAsIterator(
+            // Release the prior file's reader before opening the next.
+            currentRecordIteratorCloser.close()
+            val nextIter = IcebergUtil.readDataFileAsIterator(
               nextFile.file(),
               schemaToUse,
               table.get
             )
+            currentRecordIteratorCloser = nextIter
+            currentRecordIterator = nextIter.asScala
 
             // Skip records within the file if necessary
             val recordsToSkipInFile = from - numOfSkippedRecords
@@ -288,7 +305,13 @@ private[storage] class IcebergDocument[T >: Null <: 
AnyRef](
             }
           }
 
-          currentRecordIterator.hasNext
+          val hasMore = currentRecordIterator.hasNext
+          if (!hasMore) {
+            // All files exhausted; release the last file's reader.
+            currentRecordIteratorCloser.close()
+            currentRecordIteratorCloser = () => ()
+          }
+          hasMore
         }
 
         override def next(): T = {
@@ -355,83 +378,89 @@ private[storage] class IcebergDocument[T >: Null <: 
AnyRef](
     }
 
     // Scan table files and aggregate statistics
-    
table.newScan().includeColumnStats().planFiles().iterator().asScala.foreach { 
file =>
-      val fileStats = file.file()
-      // Extract column-level statistics
-      val lowerBounds =
-        Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer, 
ByteBuffer].asJava)
-      val upperBounds =
-        Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer, 
ByteBuffer].asJava)
-      val nullCounts =
-        Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer, 
java.lang.Long].asJava)
-      val nanCounts =
-        Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer, 
java.lang.Long].asJava)
-
-      fieldTypes.foreach {
-        case (field, (fieldId, fieldType)) =>
-          val lowerBound = Option(lowerBounds.get(fieldId))
-          val upperBound = Option(upperBounds.get(fieldId))
-          val nullCount: Long = 
Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
-          val nanCount: Long = 
Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
-          val fieldStat = fieldStats(field)
-
-          // Process min/max values for numerical types
-          if (
-            fieldType == Types.IntegerType.get() || fieldType == Types.LongType
-              .get() || fieldType == Types.DoubleType.get()
-          ) {
-            lowerBound.foreach { buffer =>
-              val minValue =
-                Conversions.fromByteBuffer(fieldType, 
buffer).asInstanceOf[Number].doubleValue()
-              fieldStat("min") = 
Math.min(fieldStat("min").asInstanceOf[Double], minValue)
-            }
+    Using.resource(table.newScan().includeColumnStats().planFiles()) { tasks =>
+      tasks.iterator().asScala.foreach { file =>
+        val fileStats = file.file()
+        // Extract column-level statistics
+        val lowerBounds =
+          Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer, 
ByteBuffer].asJava)
+        val upperBounds =
+          Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer, 
ByteBuffer].asJava)
+        val nullCounts =
+          Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer, 
java.lang.Long].asJava)
+        val nanCounts =
+          Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer, 
java.lang.Long].asJava)
+
+        fieldTypes.foreach {
+          case (field, (fieldId, fieldType)) =>
+            val lowerBound = Option(lowerBounds.get(fieldId))
+            val upperBound = Option(upperBounds.get(fieldId))
+            val nullCount: Long = 
Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
+            val nanCount: Long = 
Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
+            val fieldStat = fieldStats(field)
+
+            // Process min/max values for numerical types
+            if (
+              fieldType == Types.IntegerType.get() || fieldType == 
Types.LongType
+                .get() || fieldType == Types.DoubleType.get()
+            ) {
+              lowerBound.foreach { buffer =>
+                val minValue =
+                  Conversions.fromByteBuffer(fieldType, 
buffer).asInstanceOf[Number].doubleValue()
+                fieldStat("min") = 
Math.min(fieldStat("min").asInstanceOf[Double], minValue)
+              }
 
-            upperBound.foreach { buffer =>
-              val maxValue =
-                Conversions.fromByteBuffer(fieldType, 
buffer).asInstanceOf[Number].doubleValue()
-              fieldStat("max") = 
Math.max(fieldStat("max").asInstanceOf[Double], maxValue)
-            }
-          }
-          // Process min/max values for timestamp types
-          else if (
-            fieldType == Types.TimestampType.withoutZone() || fieldType == 
Types.TimestampType
-              .withZone()
-          ) {
-            lowerBound.foreach { buffer =>
-              val epochMicros = Conversions
-                .fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
-                .asInstanceOf[Long]
-              val dateValue =
-                Instant.ofEpochMilli(epochMicros / 
1000).atZone(ZoneOffset.UTC).toLocalDate
-              fieldStat("min") =
-                if (
-                  dateValue
-                    
.isBefore(LocalDate.parse(fieldStat("min").asInstanceOf[String], dateFormatter))
-                )
-                  dateValue.format(dateFormatter)
-                else
-                  fieldStat("min")
+              upperBound.foreach { buffer =>
+                val maxValue =
+                  Conversions.fromByteBuffer(fieldType, 
buffer).asInstanceOf[Number].doubleValue()
+                fieldStat("max") = 
Math.max(fieldStat("max").asInstanceOf[Double], maxValue)
+              }
             }
+            // Process min/max values for timestamp types
+            else if (
+              fieldType == Types.TimestampType.withoutZone() || fieldType == 
Types.TimestampType
+                .withZone()
+            ) {
+              lowerBound.foreach { buffer =>
+                val epochMicros = Conversions
+                  .fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
+                  .asInstanceOf[Long]
+                val dateValue =
+                  Instant.ofEpochMilli(epochMicros / 
1000).atZone(ZoneOffset.UTC).toLocalDate
+                fieldStat("min") =
+                  if (
+                    dateValue
+                      .isBefore(
+                        LocalDate.parse(fieldStat("min").asInstanceOf[String], 
dateFormatter)
+                      )
+                  )
+                    dateValue.format(dateFormatter)
+                  else
+                    fieldStat("min")
+              }
 
-            upperBound.foreach { buffer =>
-              val epochMicros = Conversions
-                .fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
-                .asInstanceOf[Long]
-              val dateValue =
-                Instant.ofEpochMilli(epochMicros / 
1000).atZone(ZoneOffset.UTC).toLocalDate
-              fieldStat("max") =
-                if (
-                  dateValue
-                    
.isAfter(LocalDate.parse(fieldStat("max").asInstanceOf[String], dateFormatter))
-                )
-                  dateValue.format(dateFormatter)
-                else
-                  fieldStat("max")
+              upperBound.foreach { buffer =>
+                val epochMicros = Conversions
+                  .fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
+                  .asInstanceOf[Long]
+                val dateValue =
+                  Instant.ofEpochMilli(epochMicros / 
1000).atZone(ZoneOffset.UTC).toLocalDate
+                fieldStat("max") =
+                  if (
+                    dateValue
+                      .isAfter(
+                        LocalDate.parse(fieldStat("max").asInstanceOf[String], 
dateFormatter)
+                      )
+                  )
+                    dateValue.format(dateFormatter)
+                  else
+                    fieldStat("max")
+              }
             }
-          }
-          // Update non-null count
-          fieldStat("not_null_count") = 
fieldStat("not_null_count").asInstanceOf[Long] +
-            (fileStats.recordCount().toLong - nullCount - nanCount)
+            // Update non-null count
+            fieldStat("not_null_count") = 
fieldStat("not_null_count").asInstanceOf[Long] +
+              (fileStats.recordCount().toLong - nullCount - nanCount)
+        }
       }
     }
     fieldStats.map {
@@ -478,7 +507,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
     val fileScanTasks: Seq[FileScanTask] = {
       val table = 
this.catalog.loadTable(TableIdentifier.of(this.tableNamespace, this.tableName))
       table.refresh()
-      table.newScan().planFiles().iterator().asScala.toSeq
+      Using.resource(table.newScan().planFiles()) { tasks =>
+        tasks.iterator().asScala.toSeq
+      }
     }
 
     if (fileScanTasks.isEmpty) {
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
index cee293a758..0b45b9eec3 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
@@ -27,7 +27,7 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders
 import org.apache.iceberg.data.{GenericRecord, Record}
 import org.apache.iceberg.aws.s3.S3FileIO
 import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO}
-import org.apache.iceberg.io.{CloseableIterable, InputFile}
+import org.apache.iceberg.io.{CloseableIterator, InputFile}
 import org.apache.iceberg.jdbc.JdbcCatalog
 import org.apache.iceberg.parquet.{Parquet, ParquetValueReader}
 import org.apache.iceberg.rest.RESTCatalog
@@ -404,17 +404,23 @@ object IcebergUtil {
   }
 
   /**
-    * Util function to create a Record iterator over the given DataFile in 
Iceberg
+    * Returns a Record iterator over the given Iceberg DataFile.
+    *
+    * The returned `CloseableIterator` (Iceberg's iterator type) owns the
+    * underlying Parquet reader / S3InputStream / AWS HTTP-pool slot. The
+    * caller MUST close it once iteration is finished, otherwise those
+    * resources are leaked.
+    *
     * @param dataFile the data file
     * @param schema the schema of the table
     * @param table the iceberg table
-    * @return an iterator over the records in the data file
+    * @return a closeable iterator over the records in the data file
     */
   def readDataFileAsIterator(
       dataFile: DataFile,
       schema: IcebergSchema,
       table: Table
-  ): Iterator[Record] = {
+  ): CloseableIterator[Record] = {
     val inputFile: InputFile = table.io().newInputFile(dataFile)
     val readerFunc
         : java.util.function.Function[org.apache.parquet.schema.MessageType, 
ParquetValueReader[
@@ -422,13 +428,12 @@ object IcebergUtil {
         ]] =
       (messageType: org.apache.parquet.schema.MessageType) =>
         GenericParquetReaders.buildReader(schema, messageType)
-    val closeableIterable: CloseableIterable[Record] =
-      Parquet
-        .read(inputFile)
-        .project(schema)
-        .createReaderFunc(readerFunc)
-        .build()
-    closeableIterable.iterator().asScala
+    Parquet
+      .read(inputFile)
+      .project(schema)
+      .createReaderFunc(readerFunc)
+      .build()
+      .iterator()
   }
 
 }
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 6184ce8dcd..0e9b2ae68a 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -282,6 +282,27 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
     )
   }
 
+  it should "expose written rows as a non-empty ZIP via asInputStream" in {
+    val items = generateSampleItems().take(3)
+    val writer = document.writer(UUID.randomUUID().toString)
+    writer.open()
+    items.foreach(writer.putOne)
+    writer.close()
+
+    val stream = document.asInputStream()
+    try {
+      val bytes = stream.readAllBytes()
+      assert(bytes.nonEmpty, "asInputStream should yield non-empty bytes after 
writes")
+      // ZIP local-file-header magic bytes: 0x50 0x4B 0x03 0x04 ("PK\x03\x04").
+      assert(
+        bytes(0) == 0x50.toByte && bytes(1) == 0x4b.toByte,
+        "expected ZIP magic bytes at the start of the stream"
+      )
+    } finally {
+      stream.close()
+    }
+  }
+
   /** Returns a dynamic proxy for `realTable` that increments `counter` on 
every `refresh()` call. */
   private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): 
Table =
     Proxy

Reply via email to