Updated Branches:
  refs/heads/branch-0.8 08481679c -> 96670e716

Merge pull request #75 from JoshRosen/block-manager-cleanup

Code de-duplication in BlockManager

The BlockManager has a few methods that duplicate most of their code.  This 
pull request extracts the duplicated code into private doPut(), doGetLocal(), 
and doGetRemote() methods that unify the storing/reading of bytes or objects.

I believe that I preserved the logic of the original code, but I'd appreciate 
some help in reviewing this.
(cherry picked from commit edc5e3f8f44a658e9829f2ee65d5fb32b464121b)

Signed-off-by: Aaron Davidson <[email protected]>

Conflicts:
        core/src/main/scala/org/apache/spark/storage/BlockManager.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/07b3f01f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/07b3f01f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/07b3f01f

Branch: refs/heads/branch-0.8
Commit: 07b3f01f5bf4e2f81ac7abc4906118cf792434e1
Parents: 7e00dee
Author: Matei Zaharia <[email protected]>
Authored: Sun Oct 20 17:18:06 2013 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Mon Nov 4 23:32:56 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 458 +++++++------------
 1 file changed, 166 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/07b3f01f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ccc05f5..fbedfbc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import java.io.{File, InputStream, OutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
-import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
+import scala.collection.mutable.{HashMap, ArrayBuffer}
 import scala.util.Random
 
 import akka.actor.{ActorSystem, Cancellable, Props}
@@ -267,89 +267,14 @@ private[spark] class BlockManager(
    */
   def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
     logDebug("Getting local block " + blockId)
-    val info = blockInfo.get(blockId).orNull
-    if (info != null) {
-      info.synchronized {
-
-        // In the another thread is writing the block, wait for it to become 
ready.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning("Block " + blockId + " was marked as failure.")
-          return None
-        }
-
-        val level = info.level
-        logDebug("Level for block " + blockId + " is " + level)
-
-        // Look for the block in memory
-        if (level.useMemory) {
-          logDebug("Getting block " + blockId + " from memory")
-          memoryStore.getValues(blockId) match {
-            case Some(iterator) =>
-              return Some(iterator)
-            case None =>
-              logDebug("Block " + blockId + " not found in memory")
-          }
-        }
-
-        // Look for block on disk, potentially loading it back into memory if 
required
-        if (level.useDisk) {
-          logDebug("Getting block " + blockId + " from disk")
-          if (level.useMemory && level.deserialized) {
-            diskStore.getValues(blockId) match {
-              case Some(iterator) =>
-                // Put the block back in memory before returning it
-                // TODO: Consider creating a putValues that also takes in a 
iterator ?
-                val elements = new ArrayBuffer[Any]
-                elements ++= iterator
-                memoryStore.putValues(blockId, elements, level, true).data 
match {
-                  case Left(iterator2) =>
-                    return Some(iterator2)
-                  case _ =>
-                    throw new Exception("Memory store did not return back an 
iterator")
-                }
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, 
though it should be")
-            }
-          } else if (level.useMemory && !level.deserialized) {
-            // Read it as a byte buffer into memory first, then return it
-            diskStore.getBytes(blockId) match {
-              case Some(bytes) =>
-                // Put a copy of the block back in memory before returning it. 
Note that we can't
-                // put the ByteBuffer returned by the disk store as that's a 
memory-mapped file.
-                // The use of rewind assumes this.
-                assert (0 == bytes.position())
-                val copyForMemory = ByteBuffer.allocate(bytes.limit)
-                copyForMemory.put(bytes)
-                memoryStore.putBytes(blockId, copyForMemory, level)
-                bytes.rewind()
-                return Some(dataDeserialize(blockId, bytes))
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, 
though it should be")
-            }
-          } else {
-            diskStore.getValues(blockId) match {
-              case Some(iterator) =>
-                return Some(iterator)
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, 
though it should be")
-            }
-          }
-        }
-      }
-    } else {
-      logDebug("Block " + blockId + " not registered locally")
-    }
-    return None
+    doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
   }
 
   /**
    * Get block from the local block manager as serialized bytes.
    */
   def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
-    // TODO: This whole thing is very similar to getLocal; we need to refactor 
it somehow
     logDebug("Getting local block " + blockId + " as bytes")
-
     // As an optimization for map output fetches, if the block is for a 
shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items 
so this should work
     if (blockId.isShuffle) {
@@ -360,12 +285,15 @@ private[spark] class BlockManager(
           throw new Exception("Block " + blockId + " not found on disk, though 
it should be")
       }
     }
+    doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+  }
 
+  private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
     val info = blockInfo.get(blockId).orNull
     if (info != null) {
       info.synchronized {
 
-        // In the another thread is writing the block, wait for it to become 
ready.
+        // If another thread is writing the block, wait for it to become ready.
         if (!info.waitForReady()) {
           // If we get here, the block write failed.
           logWarning("Block " + blockId + " was marked as failure.")
@@ -378,62 +306,104 @@ private[spark] class BlockManager(
         // Look for the block in memory
         if (level.useMemory) {
           logDebug("Getting block " + blockId + " from memory")
-          memoryStore.getBytes(blockId) match {
-            case Some(bytes) =>
-              return Some(bytes)
+          val result = if (asValues) {
+            memoryStore.getValues(blockId)
+          } else {
+            memoryStore.getBytes(blockId)
+          }
+          result match {
+            case Some(values) =>
+              return Some(values)
             case None =>
               logDebug("Block " + blockId + " not found in memory")
           }
         }
 
-        // Look for block on disk
+        // Look for block on disk, potentially storing it back into memory if 
required:
         if (level.useDisk) {
-          // Read it as a byte buffer into memory first, then return it
-          diskStore.getBytes(blockId) match {
-            case Some(bytes) =>
-              assert (0 == bytes.position())
-              if (level.useMemory) {
-                if (level.deserialized) {
-                  memoryStore.putBytes(blockId, bytes, level)
-                } else {
-                  // The memory store will hang onto the ByteBuffer, so give 
it a copy instead of
-                  // the memory-mapped file buffer we got from the disk store
-                  val copyForMemory = ByteBuffer.allocate(bytes.limit)
-                  copyForMemory.put(bytes)
-                  memoryStore.putBytes(blockId, copyForMemory, level)
-                }
-              }
-              bytes.rewind()
-              return Some(bytes)
+          logDebug("Getting block " + blockId + " from disk")
+          val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
+            case Some(bytes) => bytes
             case None =>
               throw new Exception("Block " + blockId + " not found on disk, 
though it should be")
           }
+          assert (0 == bytes.position())
+
+          if (!level.useMemory) {
+            // If the block shouldn't be stored in memory, we can just return 
it:
+            if (asValues) {
+              return Some(dataDeserialize(blockId, bytes))
+            } else {
+              return Some(bytes)
+            }
+          } else {
+            // Otherwise, we also have to store something in the memory store:
+            if (!level.deserialized || !asValues) {
+              // We'll store the bytes in memory if the block's storage level 
includes
+              // "memory serialized", or if it should be cached as objects in 
memory
+              // but we only requested its serialized bytes:
+              val copyForMemory = ByteBuffer.allocate(bytes.limit)
+              copyForMemory.put(bytes)
+              memoryStore.putBytes(blockId, copyForMemory, level)
+              bytes.rewind()
+            }
+            if (!asValues) {
+              return Some(bytes)
+            } else {
+              val values = dataDeserialize(blockId, bytes)
+              if (level.deserialized) {
+                // Cache the values before returning them:
+                // TODO: Consider creating a putValues that also takes in a 
iterator?
+                val valuesBuffer = new ArrayBuffer[Any]
+                valuesBuffer ++= values
+                memoryStore.putValues(blockId, valuesBuffer, level, true).data 
match {
+                  case Left(values2) =>
+                    return Some(values2)
+                  case _ =>
+                    throw new Exception("Memory store did not return back an 
iterator")
+                }
+              } else {
+                return Some(values)
+              }
+            }
+          }
         }
       }
     } else {
       logDebug("Block " + blockId + " not registered locally")
     }
-    return None
+    None
   }
 
   /**
    * Get block from remote block managers.
    */
   def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
     logDebug("Getting remote block " + blockId)
-    // Get locations of block
-    val locations = Random.shuffle(master.getLocations(blockId))
+    doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+  }
+
+  /**
+   * Get block from remote block managers as serialized bytes.
+   */
+   def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+    logDebug("Getting remote block " + blockId + " as bytes")
+    doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+   }
 
-    // Get block from remote locations
+  private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
+    require(blockId != null, "BlockId is null")
+    val locations = Random.shuffle(master.getLocations(blockId))
     for (loc <- locations) {
       logDebug("Getting remote block " + blockId + " from " + loc)
       val data = BlockManagerWorker.syncGetBlock(
         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
       if (data != null) {
-        return Some(dataDeserialize(blockId, data))
+        if (asValues) {
+          return Some(dataDeserialize(blockId, data))
+        } else {
+          return Some(data)
+        }
       }
       logDebug("The value of block " + blockId + " is null")
     }
@@ -442,31 +412,6 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Get block from remote block managers as serialized bytes.
-   */
-   def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
-     // TODO: As with getLocalBytes, this is very similar to getRemote and 
perhaps should be
-     // refactored.
-     if (blockId == null) {
-       throw new IllegalArgumentException("Block Id is null")
-     }
-     logDebug("Getting remote block " + blockId + " as bytes")
-     
-     val locations = master.getLocations(blockId)
-     for (loc <- locations) {
-       logDebug("Getting remote block " + blockId + " from " + loc)
-       val data = BlockManagerWorker.syncGetBlock(
-         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
-       if (data != null) {
-         return Some(data)
-       }
-       logDebug("The value of block " + blockId + " is null")
-     }
-     logDebug("Block " + blockId + " not found")
-     return None
-   }
-
-  /**
    * Get a block from the block manager (either local or remote).
    */
   def get(blockId: BlockId): Option[Iterator[Any]] = {
@@ -533,17 +478,24 @@ private[spark] class BlockManager(
    * Put a new block of values to the block manager. Returns its (estimated) 
size in bytes.
    */
   def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
-    tellMaster: Boolean = true) : Long = {
+          tellMaster: Boolean = true) : Long = {
+    require(values != null, "Values is null")
+    doPut(blockId, Left(values), level, tellMaster)
+  }
 
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
-    if (values == null) {
-      throw new IllegalArgumentException("Values is null")
-    }
-    if (level == null || !level.isValid) {
-      throw new IllegalArgumentException("Storage level is null or invalid")
-    }
+  /**
+   * Put a new block of serialized bytes to the block manager.
+   */
+  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
+               tellMaster: Boolean = true) {
+    require(bytes != null, "Bytes is null")
+    doPut(blockId, Right(bytes), level, tellMaster)
+  }
+
+  private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], 
ByteBuffer],
+                    level: StorageLevel, tellMaster: Boolean = true): Long = {
+    require(blockId != null, "BlockId is null")
+    require(level != null && level.isValid, "StorageLevel is null or invalid")
 
     // Remember the block's storage level so that we can correctly drop it to 
disk if it needs
     // to be dropped right after it got put into memory. Note, however, that 
other threads will
@@ -559,7 +511,8 @@ private[spark] class BlockManager(
           return oldBlockOpt.get.size
         }
 
-        // TODO: So the block info exists - but previous attempt to load it 
(?) failed. What do we do now ? Retry on it ?
+        // TODO: So the block info exists - but previous attempt to load it 
(?) failed.
+        // What do we do now ? Retry on it ?
         oldBlockOpt.get
       } else {
         tinfo
@@ -568,10 +521,10 @@ private[spark] class BlockManager(
 
     val startTimeMs = System.currentTimeMillis
 
-    // If we need to replicate the data, we'll want access to the values, but 
because our
-    // put will read the whole iterator, there will be no values left. For the 
case where
-    // the put serializes data, we'll remember the bytes, above; but for the 
case where it
-    // doesn't, such as deserialized storage, let's rely on the put returning 
an Iterator.
+    // If we're storing values and we need to replicate the data, we'll want 
access to the values,
+    // but because our put will read the whole iterator, there will be no 
values left. For the
+    // case where the put serializes data, we'll remember the bytes, above; 
but for the case where
+    // it doesn't, such as deserialized storage, let's rely on the put 
returning an Iterator.
     var valuesAfterPut: Iterator[Any] = null
 
     // Ditto for the bytes after the put
@@ -580,30 +533,51 @@ private[spark] class BlockManager(
     // Size of the block in bytes (to return to caller)
     var size = 0L
 
+    // If we're storing bytes, then initiate the replication before storing 
them locally.
+    // This is faster as data is already serialized and ready to send.
+    val replicationFuture = if (data.isRight && level.replication > 1) {
+      val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, 
just creates a wrapper
+      Future {
+        replicate(blockId, bufferView, level)
+      }
+    } else {
+      null
+    }
+
     myInfo.synchronized {
       logTrace("Put for block " + blockId + " took " + 
Utils.getUsedTimeMs(startTimeMs)
         + " to get into synchronized block")
 
       var marked = false
       try {
-        if (level.useMemory) {
-          // Save it just to memory first, even if it also has useDisk set to 
true; we will later
-          // drop it to disk if the memory store can't hold it.
-          val res = memoryStore.putValues(blockId, values, level, true)
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case Left(newIterator) => valuesAfterPut = newIterator
+        data match {
+          case Left(values) => {
+            if (level.useMemory) {
+              // Save it just to memory first, even if it also has useDisk set 
to true; we will
+              // drop it to disk later if the memory store can't hold it.
+              val res = memoryStore.putValues(blockId, values, level, true)
+              size = res.size
+              res.data match {
+                case Right(newBytes) => bytesAfterPut = newBytes
+                case Left(newIterator) => valuesAfterPut = newIterator
+              }
+            } else {
+              // Save directly to disk.
+              // Don't get back the bytes unless we replicate them.
+              val askForBytes = level.replication > 1
+              val res = diskStore.putValues(blockId, values, level, 
askForBytes)
+              size = res.size
+              res.data match {
+                case Right(newBytes) => bytesAfterPut = newBytes
+                case _ =>
+              }
+            }
           }
-        } else {
-          // Save directly to disk.
-          // Don't get back the bytes unless we replicate them.
-          val askForBytes = level.replication > 1
-          val res = diskStore.putValues(blockId, values, level, askForBytes)
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case _ =>
+          case Right(bytes) => {
+            bytes.rewind()
+            // Store it only in memory at first, even if useDisk is also set 
to true
+            (if (level.useMemory) memoryStore else 
diskStore).putBytes(blockId, bytes, level)
+            size = bytes.limit
           }
         }
 
@@ -628,125 +602,39 @@ private[spark] class BlockManager(
     }
     logDebug("Put block " + blockId + " locally took " + 
Utils.getUsedTimeMs(startTimeMs))
 
-    // Replicate block if required
+    // Either we're storing bytes and we asynchronously started replication, 
or we're storing
+    // values and need to serialize and replicate them now:
     if (level.replication > 1) {
-      val remoteStartTime = System.currentTimeMillis
-      // Serialize the block if not already done
-      if (bytesAfterPut == null) {
-        if (valuesAfterPut == null) {
-          throw new SparkException(
-            "Underlying put returned neither an Iterator nor bytes! This 
shouldn't happen.")
-        }
-        bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
-      }
-      replicate(blockId, bytesAfterPut, level)
-      logDebug("Put block " + blockId + " remotely took " + 
Utils.getUsedTimeMs(remoteStartTime))
-    }
-    BlockManager.dispose(bytesAfterPut)
-
-    return size
-  }
-
-
-  /**
-   * Put a new block of serialized bytes to the block manager.
-   */
-  def putBytes(
-    blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: 
Boolean = true) {
-
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
-    if (bytes == null) {
-      throw new IllegalArgumentException("Bytes is null")
-    }
-    if (level == null || !level.isValid) {
-      throw new IllegalArgumentException("Storage level is null or invalid")
-    }
-
-    // Remember the block's storage level so that we can correctly drop it to 
disk if it needs
-    // to be dropped right after it got put into memory. Note, however, that 
other threads will
-    // not be able to get() this block until we call markReady on its 
BlockInfo.
-    val myInfo = {
-      val tinfo = new BlockInfoImpl(level, tellMaster)
-      // Do atomically !
-      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
-
-      if (oldBlockOpt.isDefined) {
-        if (oldBlockOpt.get.waitForReady()) {
-          logWarning("Block " + blockId + " already exists on this machine; 
not re-adding it")
-          return
-        }
-
-        // TODO: So the block info exists - but previous attempt to load it 
(?) failed. What do we do now ? Retry on it ?
-        oldBlockOpt.get
-      } else {
-        tinfo
-      }
-    }
-
-    val startTimeMs = System.currentTimeMillis
-
-    // Initiate the replication before storing it locally. This is faster as
-    // data is already serialized and ready for sending
-    val replicationFuture = if (level.replication > 1) {
-      val bufferView = bytes.duplicate() // Doesn't copy the bytes, just 
creates a wrapper
-      Future {
-        replicate(blockId, bufferView, level)
-      }
-    } else {
-      null
-    }
-
-    myInfo.synchronized {
-      logDebug("PutBytes for block " + blockId + " took " + 
Utils.getUsedTimeMs(startTimeMs)
-        + " to get into synchronized block")
-
-      var marked = false
-      try {
-        if (level.useMemory) {
-          // Store it only in memory at first, even if useDisk is also set to 
true
-          bytes.rewind()
-          memoryStore.putBytes(blockId, bytes, level)
-        } else {
-          bytes.rewind()
-          diskStore.putBytes(blockId, bytes, level)
-        }
-
-        // assert (0 == bytes.position(), "" + bytes)
-
-        // Now that the block is in either the memory or disk store, let other 
threads read it,
-        // and tell the master about it.
-        marked = true
-        myInfo.markReady(bytes.limit)
-        if (tellMaster) {
-          reportBlockStatus(blockId, myInfo)
-        }
-      } finally {
-        // If we failed at putting the block to memory/disk, notify other 
possible readers
-        // that it has failed, and then remove it from the block info map.
-        if (! marked) {
-          // Note that the remove must happen before markFailure otherwise 
another thread
-          // could've inserted a new BlockInfo before we remove it.
-          blockInfo.remove(blockId)
-          myInfo.markFailure()
-          logWarning("Putting block " + blockId + " failed")
+      data match {
+        case Right(bytes) => Await.ready(replicationFuture, Duration.Inf)
+        case Left(values) => {
+          val remoteStartTime = System.currentTimeMillis
+          // Serialize the block if not already done
+          if (bytesAfterPut == null) {
+            if (valuesAfterPut == null) {
+              throw new SparkException(
+                "Underlying put returned neither an Iterator nor bytes! This 
shouldn't happen.")
+            }
+            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
+          }
+          replicate(blockId, bytesAfterPut, level)
+          logDebug("Put block " + blockId + " remotely took " +
+            Utils.getUsedTimeMs(remoteStartTime))
         }
       }
     }
 
-    // If replication had started, then wait for it to finish
-    if (level.replication > 1) {
-      Await.ready(replicationFuture, Duration.Inf)
-    }
+    BlockManager.dispose(bytesAfterPut)
 
     if (level.replication > 1) {
-      logDebug("PutBytes for block " + blockId + " with replication took " +
+      logDebug("Put for block " + blockId + " with replication took " +
         Utils.getUsedTimeMs(startTimeMs))
     } else {
-      logDebug("PutBytes for block " + blockId + " without replication took " +
+      logDebug("Put for block " + blockId + " without replication took " +
         Utils.getUsedTimeMs(startTimeMs))
     }
+
+    size
   }
 
   /**
@@ -871,34 +759,20 @@ private[spark] class BlockManager(
 
   private def dropOldNonBroadcastBlocks(cleanupTime: Long) {
     logInfo("Dropping non broadcast blocks older than " + cleanupTime)
-    val iterator = blockInfo.internalMap.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val (id, info, time) = (entry.getKey, entry.getValue._1, 
entry.getValue._2)
-      if (time < cleanupTime && !id.isBroadcast) {
-        info.synchronized {
-          val level = info.level
-          if (level.useMemory) {
-            memoryStore.remove(id)
-          }
-          if (level.useDisk) {
-            diskStore.remove(id)
-          }
-          iterator.remove()
-          logInfo("Dropped block " + id)
-        }
-        reportBlockStatus(id, info)
-      }
-    }
+    dropOldBlocks(cleanupTime, !_.isBroadcast)
   }
 
   private def dropOldBroadcastBlocks(cleanupTime: Long) {
     logInfo("Dropping broadcast blocks older than " + cleanupTime)
+    dropOldBlocks(cleanupTime, _.isBroadcast)
+  }
+
+  private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => 
Boolean)) {
     val iterator = blockInfo.internalMap.entrySet().iterator()
     while (iterator.hasNext) {
       val entry = iterator.next()
       val (id, info, time) = (entry.getKey, entry.getValue._1, 
entry.getValue._2)
-      if (time < cleanupTime && id.isBroadcast) {
+      if (time < cleanupTime && shouldDrop(id)) {
         info.synchronized {
           val level = info.level
           if (level.useMemory) {

Reply via email to