kecookier commented on issue #8128:
URL: 
https://github.com/apache/incubator-gluten/issues/8128#issuecomment-2518915032

   Detail of Vanilla Spark
   
https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L131
   ```scala
   private[memory] def acquireMemory(
         numBytes: Long,
         taskAttemptId: Long,
         maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),
         computeMaxPoolSize: () => Long = () => poolSize): Long = 
lock.synchronized {
       assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
   
       // Keep looping until we're either sure that we don't want to grant this 
request (because this
       // task would have more than 1 / numActiveTasks of the memory) or we 
have enough free
       // memory to give it (we always let each task get at least 1 / (2 * 
numActiveTasks)).
       // TODO: simplify this to limit each task to its own slot
       while (true) {
         val numActiveTasks = memoryForTask.keys.size
         val curMem = memoryForTask(taskAttemptId)
   
         // In every iteration of this loop, we should first try to reclaim any 
borrowed execution
         // space from storage. This is necessary because of the potential race 
condition where new
         // storage blocks may steal the free execution memory that this task 
was waiting for.
         maybeGrowPool(numBytes - memoryFree)
   
         // Maximum size the pool would have after potentially growing the pool.
         // This is used to compute the upper bound of how much memory each 
task can occupy. This
         // must take into account potential free memory as well as the amount 
this pool currently
         // occupies. Otherwise, we may run into SPARK-12155 where, in unified 
memory management,
         // we did not take into account space that could have been freed by 
evicting cached blocks.
         val maxPoolSize = computeMaxPoolSize()
         val maxMemoryPerTask = maxPoolSize / numActiveTasks
         val minMemoryPerTask = poolSize / (2 * numActiveTasks)
   
         // How much we can grant this task; keep its share within 0 <= X <= 1 
/ numActiveTasks
         val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - 
curMem))
         // Only give it as much memory as is free, which might be none if it 
reached 1 / numTasks
         val toGrant = math.min(maxToGrant, memoryFree)
   
         // We want to let each task get at least 1 / (2 * numActiveTasks) 
before blocking;
         // if we can't give it this much now, wait for other tasks to free up 
memory
         // (this happens if older tasks allocated lots of memory before N grew)
         if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
           logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName 
pool to be free")
           lock.wait()
         } else {
           memoryForTask(taskAttemptId) += toGrant
           return toGrant
         }
       }
       0L  // Never reached
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to