kecookier commented on issue #8128: URL: https://github.com/apache/incubator-gluten/issues/8128#issuecomment-2518915032
Detail of Vanilla Spark https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L131 ```scala private[memory] def acquireMemory( numBytes: Long, taskAttemptId: Long, maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (), computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized { assert(numBytes > 0, s"invalid number of bytes requested: $numBytes") // Keep looping until we're either sure that we don't want to grant this request (because this // task would have more than 1 / numActiveTasks of the memory) or we have enough free // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)). // TODO: simplify this to limit each task to its own slot while (true) { val numActiveTasks = memoryForTask.keys.size val curMem = memoryForTask(taskAttemptId) // In every iteration of this loop, we should first try to reclaim any borrowed execution // space from storage. This is necessary because of the potential race condition where new // storage blocks may steal the free execution memory that this task was waiting for. maybeGrowPool(numBytes - memoryFree) // Maximum size the pool would have after potentially growing the pool. // This is used to compute the upper bound of how much memory each task can occupy. This // must take into account potential free memory as well as the amount this pool currently // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management, // we did not take into account space that could have been freed by evicting cached blocks. val maxPoolSize = computeMaxPoolSize() val maxMemoryPerTask = maxPoolSize / numActiveTasks val minMemoryPerTask = poolSize / (2 * numActiveTasks) // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)) // Only give it as much memory as is free, which might be none if it reached 1 / numTasks val toGrant = math.min(maxToGrant, memoryFree) // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking; // if we can't give it this much now, wait for other tasks to free up memory // (this happens if older tasks allocated lots of memory before N grew) if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) { logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free") lock.wait() } else { memoryForTask(taskAttemptId) += toGrant return toGrant } } 0L // Never reached } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
