This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 3d1317f [SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should
close underlying input streams
3d1317f is described below
commit 3d1317f8657beddfc6e8a5e49dbbbaaefdff1a5c
Author: Kevin Sewell <[email protected]>
AuthorDate: Thu Feb 24 08:14:07 2022 -0800
[SPARK-38273][SQL] `decodeUnsafeRows`'s iterators should close underlying
input streams
### What changes were proposed in this pull request?
Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with
a NextIterator as opposed to a plain Iterator, this will allow us to close the
DataInputStream properly. This happens in Spark driver only.
### Why are the changes needed?
SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer.
This meant that all usages of `CompressionCodec.compressedInputStream` would
need to manually close the stream as this would no longer be handled by the
finaliser mechanism.
In SparkPlan, the result of `CompressionCodec.compressedInputStream` is
wrapped in an Iterator which never calls close.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
#### Spark Shell Configuration
```bash
$> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g"
$> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd
```
#### Test Script
```scala
import java.sql.Timestamp
import java.time.Instant
import spark.implicits._
case class Record(timestamp: Timestamp, batch: Long, value: Long)
(1 to 300).foreach { batch =>
sc.parallelize(1 to 1000000).map(Record(Timestamp.from(Instant.now()),
batch, _)).toDS.write.parquet(s"test_data/batch_$batch")
}
(1 to 300).foreach(batch =>
spark.read.parquet(s"test_data/batch_$batch").as[Record].repartition().collect())
```
#### Memory Monitor
```shell
$> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> |
grep "total kB" | awk '{print $4}'); sleep 10; done;
```
#### Results
##### Before
```
"2022-02-22 11:55:23",1400016
"2022-02-22 11:55:33",1522024
"2022-02-22 11:55:43",1587812
"2022-02-22 11:55:53",1631868
"2022-02-22 11:56:03",1657252
"2022-02-22 11:56:13",1659728
"2022-02-22 11:56:23",1664640
"2022-02-22 11:56:33",1674152
"2022-02-22 11:56:43",1697320
"2022-02-22 11:56:53",1689636
"2022-02-22 11:57:03",1783888
"2022-02-22 11:57:13",1896920
"2022-02-22 11:57:23",1950492
"2022-02-22 11:57:33",2010968
"2022-02-22 11:57:44",2066560
"2022-02-22 11:57:54",2108232
"2022-02-22 11:58:04",2158188
"2022-02-22 11:58:14",2211344
"2022-02-22 11:58:24",2260180
"2022-02-22 11:58:34",2316352
"2022-02-22 11:58:44",2367412
"2022-02-22 11:58:54",2420916
"2022-02-22 11:59:04",2472132
"2022-02-22 11:59:14",2519888
"2022-02-22 11:59:24",2571372
"2022-02-22 11:59:34",2621992
"2022-02-22 11:59:44",2672400
"2022-02-22 11:59:54",2728924
"2022-02-22 12:00:04",2777712
"2022-02-22 12:00:14",2834272
"2022-02-22 12:00:24",2881344
"2022-02-22 12:00:34",2935552
"2022-02-22 12:00:44",2984896
"2022-02-22 12:00:54",3034116
"2022-02-22 12:01:04",3087092
"2022-02-22 12:01:14",3134432
"2022-02-22 12:01:25",3198316
"2022-02-22 12:01:35",3193484
"2022-02-22 12:01:45",3193212
"2022-02-22 12:01:55",3192872
"2022-02-22 12:02:05",3191772
"2022-02-22 12:02:15",3187780
"2022-02-22 12:02:25",3177084
"2022-02-22 12:02:35",3173292
"2022-02-22 12:02:45",3173292
"2022-02-22 12:02:55",3173292
```
##### After
```
"2022-02-22 12:05:03",1377124
"2022-02-22 12:05:13",1425132
"2022-02-22 12:05:23",1564060
"2022-02-22 12:05:33",1616116
"2022-02-22 12:05:43",1637448
"2022-02-22 12:05:53",1637700
"2022-02-22 12:06:03",1653912
"2022-02-22 12:06:13",1659532
"2022-02-22 12:06:23",1673368
"2022-02-22 12:06:33",1687580
"2022-02-22 12:06:43",1711076
"2022-02-22 12:06:53",1849752
"2022-02-22 12:07:03",1861528
"2022-02-22 12:07:13",1871200
"2022-02-22 12:07:24",1878860
"2022-02-22 12:07:34",1879332
"2022-02-22 12:07:44",1886552
"2022-02-22 12:07:54",1884160
"2022-02-22 12:08:04",1880924
"2022-02-22 12:08:14",1876084
"2022-02-22 12:08:24",1878800
"2022-02-22 12:08:34",1879068
"2022-02-22 12:08:44",1880088
"2022-02-22 12:08:54",1880160
"2022-02-22 12:09:04",1880496
"2022-02-22 12:09:14",1891672
"2022-02-22 12:09:24",1878552
"2022-02-22 12:09:34",1876136
"2022-02-22 12:09:44",1890056
"2022-02-22 12:09:54",1878076
"2022-02-22 12:10:04",1882440
"2022-02-22 12:10:14",1893172
"2022-02-22 12:10:24",1894216
"2022-02-22 12:10:34",1894204
"2022-02-22 12:10:44",1894716
"2022-02-22 12:10:54",1894720
"2022-02-22 12:11:04",1894720
"2022-02-22 12:11:15",1895232
"2022-02-22 12:11:25",1895496
"2022-02-22 12:11:35",1895496
```
Closes #35613 from kevins-29/spark-38273.
Lead-authored-by: Kevin Sewell <[email protected]>
Co-authored-by: kevins-29 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 43c89dca89d1a4c0dc63354f46b5bd4b39cdda65)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/execution/SparkPlan.scala | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 2fbfe4d..10cda2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.NextIterator
object SparkPlan {
/** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted.
*/
@@ -370,10 +371,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
val bis = new ByteArrayInputStream(bytes)
val ins = new DataInputStream(codec.compressedInputStream(bis))
- new Iterator[InternalRow] {
+ new NextIterator[InternalRow] {
private var sizeOfNextRow = ins.readInt()
- override def hasNext: Boolean = sizeOfNextRow >= 0
- override def next(): InternalRow = {
+ private def _next(): InternalRow = {
val bs = new Array[Byte](sizeOfNextRow)
ins.readFully(bs)
val row = new UnsafeRow(nFields)
@@ -381,6 +381,22 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with
Logging with Serializ
sizeOfNextRow = ins.readInt()
row
}
+
+ override def getNext(): InternalRow = {
+ if (sizeOfNextRow >= 0) {
+ try {
+ _next()
+ } catch {
+ case t: Throwable if ins != null =>
+ ins.close()
+ throw t
+ }
+ } else {
+ finished = true
+ null
+ }
+ }
+ override def close(): Unit = ins.close()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]