Chris Riccomini created SAMZA-135:
-------------------------------------

             Summary: LevelDbKeyValueStore leaks memory on putAll
                 Key: SAMZA-135
                 URL: https://issues.apache.org/jira/browse/SAMZA-135
             Project: Samza
          Issue Type: Bug
          Components: kv
    Affects Versions: 0.6.0
            Reporter: Chris Riccomini
            Assignee: Chris Riccomini


Sitting in a tight loop and calling store.all(); store.close(); shows that 
memory is being leaked.

Digging into it a bit more, when we use a KeyValueStore with a cache, and we 
call store.all, the cache does:

{code}
  def all() = {
    metrics.alls.inc
    flush()
    store.all()
  }
{code}

In turn, flush() does:

{code}
  def flush() {
    trace("Flushing.")

    metrics.flushes.inc

    // write out the contents of the dirty list oldest first
    val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
    for (k <- this.dirty.reverse) {
      val entry = this.cache.get(k)
      entry.dirty = null // not dirty any more
      batch.add(new Entry(k, entry.value))
    }
    store.putAll(batch)
    store.flush
    metrics.flushBatchSize.inc(batch.size)

    // reset the dirty list
    this.dirty = new mutable.DoubleLinkedList[K]()
    this.dirtyCount = 0
  }
{code}

The store.putAll in this code, calls LevelDbKeyValueStore.putAll, which has:

{code}
  def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
    val batch = db.createWriteBatch()
    val iter = entries.iterator
    var wrote = 0
    var deletes = 0
    while (iter.hasNext) {
      wrote += 1
      val curr = iter.next()
      if (curr.getValue == null) {
        deletes += 1
        batch.delete(curr.getKey)
      } else {
        val key = curr.getKey
        val value = curr.getValue
        metrics.bytesWritten.inc(key.size + value.size)
        batch.put(key, value)
      }
    }
    db.write(batch)
    batch.close
    metrics.puts.inc(wrote)
    metrics.deletes.inc(deletes)
  }
{code}

According to the docs on https://github.com/fusesource/leveldbjni, the batch 
needs to be close!

{code}
WriteBatch batch = db.createWriteBatch();
try {
  batch.delete(bytes("Denver"));
  batch.put(bytes("Tampa"), bytes("green"));
  batch.put(bytes("London"), bytes("red"));

  db.write(batch);
} finally {
  // Make sure you close the batch to avoid resource leaks.
  batch.close();
}
{code}

This should be a one-line fix.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to