Chris Riccomini created SAMZA-135:
-------------------------------------
Summary: LevelDbKeyValueStore leaks memory on putAll
Key: SAMZA-135
URL: https://issues.apache.org/jira/browse/SAMZA-135
Project: Samza
Issue Type: Bug
Components: kv
Affects Versions: 0.6.0
Reporter: Chris Riccomini
Assignee: Chris Riccomini
Sitting in a tight loop and calling store.all(); store.close(); shows that
memory is being leaked.
Digging into it a bit more, when we use a KeyValueStore with a cache, and we
call store.all, the cache does:
{code}
def all() = {
metrics.alls.inc
flush()
store.all()
}
{code}
In turn, flush() does:
{code}
def flush() {
trace("Flushing.")
metrics.flushes.inc
// write out the contents of the dirty list oldest first
val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
for (k <- this.dirty.reverse) {
val entry = this.cache.get(k)
entry.dirty = null // not dirty any more
batch.add(new Entry(k, entry.value))
}
store.putAll(batch)
store.flush
metrics.flushBatchSize.inc(batch.size)
// reset the dirty list
this.dirty = new mutable.DoubleLinkedList[K]()
this.dirtyCount = 0
}
{code}
The store.putAll in this code, calls LevelDbKeyValueStore.putAll, which has:
{code}
def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
val batch = db.createWriteBatch()
val iter = entries.iterator
var wrote = 0
var deletes = 0
while (iter.hasNext) {
wrote += 1
val curr = iter.next()
if (curr.getValue == null) {
deletes += 1
batch.delete(curr.getKey)
} else {
val key = curr.getKey
val value = curr.getValue
metrics.bytesWritten.inc(key.size + value.size)
batch.put(key, value)
}
}
db.write(batch)
batch.close
metrics.puts.inc(wrote)
metrics.deletes.inc(deletes)
}
{code}
According to the docs on https://github.com/fusesource/leveldbjni, the batch
needs to be close!
{code}
WriteBatch batch = db.createWriteBatch();
try {
batch.delete(bytes("Denver"));
batch.put(bytes("Tampa"), bytes("green"));
batch.put(bytes("London"), bytes("red"));
db.write(batch);
} finally {
// Make sure you close the batch to avoid resource leaks.
batch.close();
}
{code}
This should be a one-line fix.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)