[
https://issues.apache.org/jira/browse/SPARK-27991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-27991.
---------------------------------
Fix Version/s: 3.2.0
Resolution: Fixed
Issue resolved by pull request 32287
[https://github.com/apache/spark/pull/32287]
> ShuffleBlockFetcherIterator should take Netty constant-factor overheads into
> account when limiting number of simultaneous block fetches
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-27991
> URL: https://issues.apache.org/jira/browse/SPARK-27991
> Project: Spark
> Issue Type: Bug
> Components: Shuffle, Spark Core
> Affects Versions: 2.4.0
> Reporter: Josh Rosen
> Priority: Major
> Fix For: 3.2.0
>
>
> ShuffleBlockFetcherIterator has logic to limit the number of simultaneous
> block fetches. By default, this logic tries to keep the number of outstanding
> block fetches [beneath a data size
> limit|https://github.com/apache/spark/blob/v2.4.3/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L274]
> ({{maxBytesInFlight}}). However, this limiting does not take fixed overheads
> into account: even though a remote block might be, say, 4KB, there are
> certain fixed-size internal overheads due to Netty buffer sizes which may
> cause the actual space requirements to be larger.
> As a result, if a map stage produces a huge number of extremely tiny blocks
> then we may see errors like
> {code:java}
> org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216
> byte(s) of direct memory (used: 39325794304, max: 39325794304)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
> [...]
> Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate
> 16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304)
> at
> io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
> at
> io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
> at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
> at
> io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
> [...]{code}
> SPARK-24989 is another report of this problem (but with a different proposed
> fix).
> This problem can currently be mitigated by setting
> {{spark.reducer.maxReqsInFlight}} to some some non-IntMax value (SPARK-6166),
> but this additional manual configuration step is cumbersome.
> Instead, I think that Spark should take these fixed overheads into account in
> the {{maxBytesInFlight}} calculation: instead of using blocks' actual sizes,
> use {{Math.min(blockSize, minimumNettyBufferSize)}}. There might be some
> tricky details involved to make this work on all configurations (e.g. to use
> a different minimum when direct buffers are disabled, etc.), but I think the
> core idea behind the fix is pretty simple.
> This will improve Spark's stability and removes configuration / tuning burden
> from end users.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]