Enrico Minack created SPARK-55469:
-------------------------------------
Summary: ShuffleBlockFetcherIterator.initialize() reads all blocks
from fallback storage
Key: SPARK-55469
URL: https://issues.apache.org/jira/browse/SPARK-55469
Project: Spark
Issue Type: Sub-task
Components: Spark Core
Affects Versions: 4.2.0
Reporter: Enrico Minack
The {{ShuffleBlockFetcherIterator}} is used to read shuffle block data from
various locations:
- local blocks (blocks own by the executor)
- host local blocks (other executors on the same host)
- local push merged blocks
- remote blocks (other executors on other hosts)
- fallback storage blocks
{{ShuffleBlockFetcherIterator.initialize()}} starts fetching remote blocks
asynchronously (up to a certain amount of bytes in flight) and reads all other
blocks lazily: it creates a {{ManagedBuffer}} that does not store any data
before data are consumed.
This is all done to have a small memory footprint while iterating over all
shuffle block data.
Reading from the fallback storage is an exception. It allocates the buffer
eagerly and reads all block data in {{initialize}}.
{code}
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
f.seek(offset)
f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
}
new NioManagedBuffer(ByteBuffer.wrap(array))
{code}
https://github.com/apache/spark/blob/ebd5b007fcf203eadcf8b037ab2b99577490f869/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala#L206-L213
In situations where all blocks are fetched from the fallback storage, the
entire shuffle data are hold in memory before {{ShuffleBlockFetcherIterator}}
starts iterates over it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]