Repository: spark Updated Branches: refs/heads/master 15298b99a -> 529f84710
[SPARK-23040][CORE][FOLLOW-UP] Avoid double wrap result Iterator. ## What changes were proposed in this pull request? Address https://github.com/apache/spark/pull/20449#discussion_r172414393, If `resultIter` is already a `InterruptibleIterator`, don't double wrap it. ## How was this patch tested? Existing tests. Author: Xingbo Jiang <[email protected]> Closes #20920 from jiangxb1987/SPARK-23040. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/529f8471 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/529f8471 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/529f8471 Branch: refs/heads/master Commit: 529f847105fa8d98a5dc4d20955e4870df6bc1c5 Parents: 15298b9 Author: Xingbo Jiang <[email protected]> Authored: Sat Mar 31 10:34:01 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Sat Mar 31 10:34:01 2018 +0800 ---------------------------------------------------------------------- .../apache/spark/shuffle/BlockStoreShuffleReader.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/529f8471/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 85e7e56..4103dfb 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -111,8 +111,13 @@ private[spark] class BlockStoreShuffleReader[K, C]( case None => aggregatedIter } - // Use another interruptible iterator here to support task cancellation as aggregator or(and) - // sorter may have consumed previous interruptible iterator. - new InterruptibleIterator[Product2[K, C]](context, resultIter) + + resultIter match { + case _: InterruptibleIterator[Product2[K, C]] => resultIter + case _ => + // Use another interruptible iterator here to support task cancellation as aggregator + // or(and) sorter may have consumed previous interruptible iterator. + new InterruptibleIterator[Product2[K, C]](context, resultIter) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
