This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5149-a7d4bc622f639aa3445d2c199ae4bbded28b8d1c in repository https://gitbox.apache.org/repos/asf/texera.git
commit 5e569568606a204070040b1521210cc9d853bc10 Author: Meng Wang <[email protected]> AuthorDate: Sat May 23 00:04:20 2026 -0700 fix: close CloseableIterable owners in Iceberg read paths (#5149) ### What changes were proposed in this PR? Fixes a resource leak in `IcebergUtil.readDataFileAsIterator` and five sibling sites in `IcebergDocument` that share the same anti-pattern: ```scala closeableIterable.iterator().asScala ``` The bare `Iterator` returned to callers held no reference to its parent `CloseableIterable`, so the parent could never be closed. Under `S3FileIO`: 1. Every read leaked one `S3InputStream` (kept open until GC because nothing in the call graph could close it). 2. The leaked stream had already borrowed one slot from the AWS SDK's `ApacheHttpClient` connection pool (default **50**; texera did not override). 3. After ~50 leaked reads the pool may saturate; new S3 reads then block on `acquireConnection` until JVM restart. This PR: - Introduces `CloseableScalaIterator[T]` (`Iterator[T] with AutoCloseable`, idempotent `close()`) in `IcebergUtil`, which wraps a `CloseableIterable[T]` and propagates `close()` to the parent. - Changes `IcebergUtil.readDataFileAsIterator` to return `CloseableScalaIterator[Record]` instead of bare `Iterator[Record]`. Callers must now close it (e.g. via `Using.resource`). - Updates the single caller in `IcebergDocument`'s read iterator to track the close handle in a sibling `AutoCloseable` field (`currentRecordIteratorCloser`) and close it on file-switch, on exhaustion, and on caller-imposed `until` cap. The sibling field is necessary because `Iterator.drop(n)` returns a bare iterator that loses the wrapper type. - Wraps the four eagerly-consumed `planFiles()` call sites — `getCount`, `seekToUsableFile`, `getTableStatistics`, `asInputStream` — in `Using.resource` so the metadata-side `CloseableIterable<FileScanTask>` is closed promptly. **Known limitation (out of scope here):** if a caller of `IcebergDocument.get()` / `getRange()` / `getAfter()` stops iterating before `hasNext` returns `false` (e.g. throws mid-loop, or calls `.take(n)` and then drops the result), the LAST file's `CloseableScalaIterator` will leak until JVM GC. Fixing this requires changing the public `Iterator[T]` return type on `VirtualDocument` to `Iterator[T] with AutoCloseable` and updating all callers — best done as a separate refactor. ### Any related issues, documentation, discussions? Closes #5143. ### How was this PR tested? - Added `IcebergUtilLeakSpec` (2 cases): validates that `CloseableScalaIterator` (a) closes its parent `CloseableIterable` when used inside `Using.resource`, and (b) is idempotent under repeated `close()` calls. - All existing iceberg specs still pass: - `IcebergUtilSpec`: 14/14 - `IcebergUtilLeakSpec`: 2/2 (new) - `IcebergDocumentSpec`: 18/18 (exercises the modified read iterator's close-on-reassign / close-on-exhaustion paths against real Iceberg infrastructure) - `IcebergTableStatsSpec`: 12/12 (exercises `getTableStatistics` with the new `Using.resource` wrap) - `IcebergDocumentConsoleMessagesSpec`: 1/1 Run locally: ``` sbt "WorkflowCore/testOnly org.apache.texera.amber.util.IcebergUtilSpec org.apache.texera.amber.util.IcebergUtilLeakSpec org.apache.texera.amber.storage.result.iceberg.*" ``` Result: `47 succeeded, 0 failed`. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]> --- .../storage/result/iceberg/IcebergDocument.scala | 245 ++++++++++++--------- .../org/apache/texera/amber/util/IcebergUtil.scala | 27 ++- .../result/iceberg/IcebergDocumentSpec.scala | 21 ++ 3 files changed, 175 insertions(+), 118 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala index e10152cdae..182da2baac 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -125,7 +125,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( .getOrElse( return 0 ) - table.newScan().planFiles().iterator().asScala.map(f => f.file().recordCount()).sum + Using.resource(table.newScan().planFiles()) { tasks => + tasks.iterator().asScala.map(f => f.file().recordCount()).sum + } } /** @@ -178,8 +180,11 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( // Iterator for usable file scan tasks private var usableFileIterator: Iterator[FileScanTask] = seekToUsableFile() - // Current record iterator for the active file + // Active file's records. Closer tracked separately because the + // `.drop` call below returns a bare `Iterator[Record]` that loses + // the wrapper type. private var currentRecordIterator: Iterator[Record] = Iterator.empty + private var currentRecordIteratorCloser: AutoCloseable = () => () // Util function to load the table's metadata private def loadTableMetadata(): Option[Table] = { @@ -202,38 +207,43 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( } table.foreach(_.refresh()) - // Retrieve and sort the file scan tasks by file sequence number + // Retrieve and sort the file scan tasks by file sequence number. + // Materialize inside `Using.resource` so the `planFiles()` + // CloseableIterable is released after collection. val fileScanTasksIterator: Iterator[FileScanTask] = table match { case Some(t) => val currentSnapshotId = Option(t.currentSnapshot()).map(_.snapshotId()) - val fileScanTasks = (lastSnapshotId, currentSnapshotId) match { - // Read from the start - case (None, Some(_)) => - val tasks = t.newScan().planFiles().iterator().asScala - lastSnapshotId = currentSnapshotId - tasks - - // Read incrementally from the last snapshot - case (Some(lastId), Some(currId)) if lastId != currId => - val tasks = t - .newIncrementalAppendScan() - .fromSnapshotExclusive(lastId) - .toSnapshot(currId) - .planFiles() - .iterator() - .asScala - lastSnapshotId = currentSnapshotId - tasks - - // No new data - case (Some(lastId), Some(currId)) if lastId == currId => - Iterator.empty - - // Default: No data yet - case _ => - Iterator.empty - } - fileScanTasks.toSeq.sortBy(_.file().fileSequenceNumber()).iterator + val fileScanTasks: Seq[FileScanTask] = + (lastSnapshotId, currentSnapshotId) match { + // Read from the start + case (None, Some(_)) => + val tasks = Using.resource(t.newScan().planFiles()) { ci => + ci.iterator().asScala.toSeq + } + lastSnapshotId = currentSnapshotId + tasks + + // Read incrementally from the last snapshot + case (Some(lastId), Some(currId)) if lastId != currId => + val tasks = Using.resource( + t + .newIncrementalAppendScan() + .fromSnapshotExclusive(lastId) + .toSnapshot(currId) + .planFiles() + ) { ci => ci.iterator().asScala.toSeq } + lastSnapshotId = currentSnapshotId + tasks + + // No new data + case (Some(lastId), Some(currId)) if lastId == currId => + Seq.empty + + // Default: No data yet + case _ => + Seq.empty + } + fileScanTasks.sortBy(_.file().fileSequenceNumber()).iterator case None => Iterator.empty @@ -255,6 +265,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( override def hasNext: Boolean = { if (numOfReturnedRecords >= totalRecordsToReturn) { + // Caller-imposed limit reached; release the active file's reader. + currentRecordIteratorCloser.close() + currentRecordIteratorCloser = () => () return false } @@ -274,11 +287,15 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( case Some(cols) => tableSchema.select(cols.asJava) case None => tableSchema } - currentRecordIterator = IcebergUtil.readDataFileAsIterator( + // Release the prior file's reader before opening the next. + currentRecordIteratorCloser.close() + val nextIter = IcebergUtil.readDataFileAsIterator( nextFile.file(), schemaToUse, table.get ) + currentRecordIteratorCloser = nextIter + currentRecordIterator = nextIter.asScala // Skip records within the file if necessary val recordsToSkipInFile = from - numOfSkippedRecords @@ -288,7 +305,13 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( } } - currentRecordIterator.hasNext + val hasMore = currentRecordIterator.hasNext + if (!hasMore) { + // All files exhausted; release the last file's reader. + currentRecordIteratorCloser.close() + currentRecordIteratorCloser = () => () + } + hasMore } override def next(): T = { @@ -355,83 +378,89 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( } // Scan table files and aggregate statistics - table.newScan().includeColumnStats().planFiles().iterator().asScala.foreach { file => - val fileStats = file.file() - // Extract column-level statistics - val lowerBounds = - Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava) - val upperBounds = - Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava) - val nullCounts = - Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava) - val nanCounts = - Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava) - - fieldTypes.foreach { - case (field, (fieldId, fieldType)) => - val lowerBound = Option(lowerBounds.get(fieldId)) - val upperBound = Option(upperBounds.get(fieldId)) - val nullCount: Long = Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L) - val nanCount: Long = Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L) - val fieldStat = fieldStats(field) - - // Process min/max values for numerical types - if ( - fieldType == Types.IntegerType.get() || fieldType == Types.LongType - .get() || fieldType == Types.DoubleType.get() - ) { - lowerBound.foreach { buffer => - val minValue = - Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue() - fieldStat("min") = Math.min(fieldStat("min").asInstanceOf[Double], minValue) - } + Using.resource(table.newScan().includeColumnStats().planFiles()) { tasks => + tasks.iterator().asScala.foreach { file => + val fileStats = file.file() + // Extract column-level statistics + val lowerBounds = + Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava) + val upperBounds = + Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer, ByteBuffer].asJava) + val nullCounts = + Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava) + val nanCounts = + Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer, java.lang.Long].asJava) + + fieldTypes.foreach { + case (field, (fieldId, fieldType)) => + val lowerBound = Option(lowerBounds.get(fieldId)) + val upperBound = Option(upperBounds.get(fieldId)) + val nullCount: Long = Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L) + val nanCount: Long = Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L) + val fieldStat = fieldStats(field) + + // Process min/max values for numerical types + if ( + fieldType == Types.IntegerType.get() || fieldType == Types.LongType + .get() || fieldType == Types.DoubleType.get() + ) { + lowerBound.foreach { buffer => + val minValue = + Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue() + fieldStat("min") = Math.min(fieldStat("min").asInstanceOf[Double], minValue) + } - upperBound.foreach { buffer => - val maxValue = - Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue() - fieldStat("max") = Math.max(fieldStat("max").asInstanceOf[Double], maxValue) - } - } - // Process min/max values for timestamp types - else if ( - fieldType == Types.TimestampType.withoutZone() || fieldType == Types.TimestampType - .withZone() - ) { - lowerBound.foreach { buffer => - val epochMicros = Conversions - .fromByteBuffer(Types.TimestampType.withoutZone(), buffer) - .asInstanceOf[Long] - val dateValue = - Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate - fieldStat("min") = - if ( - dateValue - .isBefore(LocalDate.parse(fieldStat("min").asInstanceOf[String], dateFormatter)) - ) - dateValue.format(dateFormatter) - else - fieldStat("min") + upperBound.foreach { buffer => + val maxValue = + Conversions.fromByteBuffer(fieldType, buffer).asInstanceOf[Number].doubleValue() + fieldStat("max") = Math.max(fieldStat("max").asInstanceOf[Double], maxValue) + } } + // Process min/max values for timestamp types + else if ( + fieldType == Types.TimestampType.withoutZone() || fieldType == Types.TimestampType + .withZone() + ) { + lowerBound.foreach { buffer => + val epochMicros = Conversions + .fromByteBuffer(Types.TimestampType.withoutZone(), buffer) + .asInstanceOf[Long] + val dateValue = + Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate + fieldStat("min") = + if ( + dateValue + .isBefore( + LocalDate.parse(fieldStat("min").asInstanceOf[String], dateFormatter) + ) + ) + dateValue.format(dateFormatter) + else + fieldStat("min") + } - upperBound.foreach { buffer => - val epochMicros = Conversions - .fromByteBuffer(Types.TimestampType.withoutZone(), buffer) - .asInstanceOf[Long] - val dateValue = - Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate - fieldStat("max") = - if ( - dateValue - .isAfter(LocalDate.parse(fieldStat("max").asInstanceOf[String], dateFormatter)) - ) - dateValue.format(dateFormatter) - else - fieldStat("max") + upperBound.foreach { buffer => + val epochMicros = Conversions + .fromByteBuffer(Types.TimestampType.withoutZone(), buffer) + .asInstanceOf[Long] + val dateValue = + Instant.ofEpochMilli(epochMicros / 1000).atZone(ZoneOffset.UTC).toLocalDate + fieldStat("max") = + if ( + dateValue + .isAfter( + LocalDate.parse(fieldStat("max").asInstanceOf[String], dateFormatter) + ) + ) + dateValue.format(dateFormatter) + else + fieldStat("max") + } } - } - // Update non-null count - fieldStat("not_null_count") = fieldStat("not_null_count").asInstanceOf[Long] + - (fileStats.recordCount().toLong - nullCount - nanCount) + // Update non-null count + fieldStat("not_null_count") = fieldStat("not_null_count").asInstanceOf[Long] + + (fileStats.recordCount().toLong - nullCount - nanCount) + } } } fieldStats.map { @@ -478,7 +507,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( val fileScanTasks: Seq[FileScanTask] = { val table = this.catalog.loadTable(TableIdentifier.of(this.tableNamespace, this.tableName)) table.refresh() - table.newScan().planFiles().iterator().asScala.toSeq + Using.resource(table.newScan().planFiles()) { tasks => + tasks.iterator().asScala.toSeq + } } if (fileScanTasks.isEmpty) { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala index cee293a758..0b45b9eec3 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala @@ -27,7 +27,7 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders import org.apache.iceberg.data.{GenericRecord, Record} import org.apache.iceberg.aws.s3.S3FileIO import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO} -import org.apache.iceberg.io.{CloseableIterable, InputFile} +import org.apache.iceberg.io.{CloseableIterator, InputFile} import org.apache.iceberg.jdbc.JdbcCatalog import org.apache.iceberg.parquet.{Parquet, ParquetValueReader} import org.apache.iceberg.rest.RESTCatalog @@ -404,17 +404,23 @@ object IcebergUtil { } /** - * Util function to create a Record iterator over the given DataFile in Iceberg + * Returns a Record iterator over the given Iceberg DataFile. + * + * The returned `CloseableIterator` (Iceberg's iterator type) owns the + * underlying Parquet reader / S3InputStream / AWS HTTP-pool slot. The + * caller MUST close it once iteration is finished, otherwise those + * resources are leaked. + * * @param dataFile the data file * @param schema the schema of the table * @param table the iceberg table - * @return an iterator over the records in the data file + * @return a closeable iterator over the records in the data file */ def readDataFileAsIterator( dataFile: DataFile, schema: IcebergSchema, table: Table - ): Iterator[Record] = { + ): CloseableIterator[Record] = { val inputFile: InputFile = table.io().newInputFile(dataFile) val readerFunc : java.util.function.Function[org.apache.parquet.schema.MessageType, ParquetValueReader[ @@ -422,13 +428,12 @@ object IcebergUtil { ]] = (messageType: org.apache.parquet.schema.MessageType) => GenericParquetReaders.buildReader(schema, messageType) - val closeableIterable: CloseableIterable[Record] = - Parquet - .read(inputFile) - .project(schema) - .createReaderFunc(readerFunc) - .build() - closeableIterable.iterator().asScala + Parquet + .read(inputFile) + .project(schema) + .createReaderFunc(readerFunc) + .build() + .iterator() } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 6184ce8dcd..0e9b2ae68a 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -282,6 +282,27 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter ) } + it should "expose written rows as a non-empty ZIP via asInputStream" in { + val items = generateSampleItems().take(3) + val writer = document.writer(UUID.randomUUID().toString) + writer.open() + items.foreach(writer.putOne) + writer.close() + + val stream = document.asInputStream() + try { + val bytes = stream.readAllBytes() + assert(bytes.nonEmpty, "asInputStream should yield non-empty bytes after writes") + // ZIP local-file-header magic bytes: 0x50 0x4B 0x03 0x04 ("PK\x03\x04"). + assert( + bytes(0) == 0x50.toByte && bytes(1) == 0x4b.toByte, + "expected ZIP magic bytes at the start of the stream" + ) + } finally { + stream.close() + } + } + /** Returns a dynamic proxy for `realTable` that increments `counter` on every `refresh()` call. */ private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): Table = Proxy
