szehon-ho opened a new pull request, #8132:
URL: https://github.com/apache/iceberg/pull/8132
### Problem
We observed inputStream leaks and S3 inputStream exhaustion in a simple
Spark job reading a partition with several delete files, where we had several
tasks on same worker applying deletes.
<details>
<summary>Spark S3 Exception</summary>
```
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute
HTTP request: The target server failed to respond
at
software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
~[sdk-core-2.15.40.jar:?]
at
software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
~[sdk-core-2.15.40.jar:?]
at
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:205)
~[sdk-core-2.15.40.jar:?]
```
</details>
<details>
<summary>Spark S3 Warnings</summary>
```
WARN S3InputStream: Unclosed input stream created by:
org.apache.iceberg.aws.s3.S3InputStream.<init>(S3InputStream.java:73)
org.apache.iceberg.aws.s3.S3InputFile.newStream(S3InputFile.java:83)
org.apache.iceberg.parquet.ParquetIO$ParquetInputFile.newStream(ParquetIO.java:184)
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:774)
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:658)
org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:245)
org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:71)
org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:91)
org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:37)
org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:188)
org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:187)
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
org.apache.iceberg.util.SortedMerge.iterator(SortedMerge.java:56)
org.apache.iceberg.deletes.Deletes$PositionStreamDeleteIterable.<init>(Deletes.java:214)
org.apache.iceberg.deletes.Deletes$PositionStreamDeleteFilter.<init>(Deletes.java:263)
org.apache.iceberg.deletes.Deletes.streamingFilter(Deletes.java:157)
org.apache.iceberg.data.DeleteFilter.applyPosDeletes(DeleteFilter.java:258)
org.apache.iceberg.data.DeleteFilter.filter(DeleteFilter.java:154)
org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:92)
org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:42)
org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:135)
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119)
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156)
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
scala.Option.exists(Option.scala:376)
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:224)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:136)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1513)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
```
</details>
### Description
It manifested in Spark but it seems its a generic problem in Core. The
application of DeleteFile to DataFile has two cases.
* If deletes size is under the threshold, we first make a
PositionDeleteIndex to load all the delete positions into a set.
* If deletes size is over, we stream the delete file along with data file to
apply or mark deletes on the data rows (depending on use case).
The bug concerns the second case. Here, we do not close the DeleteFile
iterator, when we close the returned row iterator.
There is some code that added the DeleteFile iterator to closeableGroup of
the Iterable, but the problem is, we do not return the Iterable to Spark,
rather we return the Iterator.
### Fix
Add close of the DeleteFile iterator , when we close the final iterator
returned to Spark.
The other part of the problem is, Spark does not explicitly close the row
iterator. They just walk to the end of it, and it is Iterator's responsibility
to close when it is exhausted on the last hasNext(). See our FilterIterator
for instance, which does this.
Hence, implemented that in the markDeleted (CDC) version of the delete
iterators, that does not use FilterIterator.
Refs:
*
https://github.com/apache/spark/blob/bdeae87067452bb41f4776c4ab444a9d9645fdfc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala#L110
*
https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/io/FilterIterator.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]