[
https://issues.apache.org/jira/browse/SAMZA-254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-254:
----------------------------------
Attachment: SAMZA-254-0.patch
Attaching a patch. RB at:
https://reviews.apache.org/r/20811/
Changes:
# Introduce a configuration parameter to the KV store called
"compaction.delete.threshold". Defaults to 1000.
# Use "compaction.delete.threshold" in LevelDBKeyValueStore to decide when to
trigger a full compaction on the underlying database. The full compaction will
remove all deleted keys from the DB. This defaults to a full compaction every
1000 deletes. Compactions are triggered on read-side (lazily). We also have a
write-side cache which results in all deletes (and compactions) being deferred
until a flush, all, or range is called.
# Add a little test to measure performance.
With this patch, the performance improves dramatically.
Before:
{noformat}
Performance test for store: cache
(1ms) Total in store: 0
(186) all() took 0ms.
(187ms) Total in store: 10000
(237) all() took 1ms.
(237ms) Total in store: 20000
(281) all() took 2ms.
(281ms) Total in store: 30000
...
(10271) all() took 191ms.
(10272ms) Total in store: 970000
(10394) all() took 96ms.
(10394ms) Total in store: 980000
(10483) all() took 60ms.
(10483ms) Total in store: 990000
(10575) all() took 64ms.
Total time: 10.575000000000001s
Performance test for store: serde
(0ms) Total in store: 0
(108) all() took 1ms.
(108ms) Total in store: 10000
(179) all() took 8ms.
(179ms) Total in store: 20000
(238) all() took 5ms.
(239ms) Total in store: 30000
(326) all() took 18ms.
...
(12809) all() took 163ms.
(12810ms) Total in store: 930000
(12966) all() took 98ms.
(12966ms) Total in store: 940000
(13097) all() took 79ms.
(13098ms) Total in store: 950000
(13254) all() took 101ms.
(13254ms) Total in store: 960000
(13427) all() took 110ms.
(13427ms) Total in store: 970000
(13594) all() took 116ms.
(13594ms) Total in store: 980000
(13834) all() took 169ms.
(13834ms) Total in store: 990000
(13999) all() took 108ms.
Total time: 13.999s
Performance test for store: cache-and-serde
(0ms) Total in store: 0
(38) all() took 0ms.
(38ms) Total in store: 10000
(82) all() took 1ms.
(83ms) Total in store: 20000
(123) all() took 1ms.
...
(9895ms) Total in store: 950000
(10101) all() took 174ms.
(10102ms) Total in store: 960000
(10295) all() took 165ms.
(10295ms) Total in store: 970000
(10421) all() took 98ms.
(10421ms) Total in store: 980000
(10515) all() took 61ms.
(10515ms) Total in store: 990000
(10608) all() took 64ms.
Total time: 10.608s
Performance test for store: leveldb
(0ms) Total in store: 0
(71) all() took 1ms.
(71ms) Total in store: 10000
(141) all() took 7ms.
(141ms) Total in store: 20000
(199) all() took 6ms.
(199ms) Total in store: 30000
...
(12257) all() took 78ms.
(12257ms) Total in store: 950000
(12412) all() took 98ms.
(12412ms) Total in store: 960000
(12580) all() took 111ms.
(12580ms) Total in store: 970000
(12753) all() took 121ms.
(12753ms) Total in store: 980000
(12989) all() took 173ms.
(12990ms) Total in store: 990000
(13163) all() took 110ms.
Total time: 13.163s
{noformat}
After:
{noformat}
Performance test for store: cache
(1ms) Total in store: 0
(194) all() took 10ms.
(195ms) Total in store: 10000
(250) all() took 13ms.
...
(4181ms) Total in store: 970000
(4219) all() took 14ms.
(4219ms) Total in store: 980000
(4258) all() took 11ms.
(4258ms) Total in store: 990000
(4296) all() took 14ms.
Total time: 4.297s
Performance test for store: serde
(0ms) Total in store: 0
(144) all() took 18ms.
(144ms) Total in store: 10000
(229) all() took 26ms.
(229ms) Total in store: 20000
(309) all() took 25ms.
...
(7833ms) Total in store: 970000
(7914) all() took 29ms.
(7914ms) Total in store: 980000
(7991) all() took 26ms.
(7991ms) Total in store: 990000
(8069) all() took 31ms.
Total time: 8.069s
Performance test for store: cache-and-serde
(0ms) Total in store: 0
(44) all() took 8ms.
(44ms) Total in store: 10000
(86) all() took 14ms.
(86ms) Total in store: 20000
(127) all() took 12ms.
...
(4028ms) Total in store: 970000
(4078) all() took 21ms.
(4078ms) Total in store: 980000
(4118) all() took 13ms.
(4118ms) Total in store: 990000
(4158) all() took 13ms.
Total time: 4.159s
Performance test for store: leveldb
(0ms) Total in store: 0
(74) all() took 15ms.
(74ms) Total in store: 10000
(157) all() took 31ms.
(157ms) Total in store: 20000
(230) all() took 23ms.
(230ms) Total in store: 30000
(312) all() took 30ms.
(312ms) Total in store: 40000
(388) all() took 25ms.
(389ms) Total in store: 50000
(473) all() took 31ms.
...
(7766ms) Total in store: 970000
(7846) all() took 29ms.
(7846ms) Total in store: 980000
(7922) all() took 24ms.
(7922ms) Total in store: 990000
(8007) all() took 32ms.
Total time: 8.007s
{noformat}
In conclusion, the patch improves every test run:
{noformat}
Performance test for store: cache
Total time: 10.575000000000001s
Performance test for store: serde
Total time: 13.999s
Performance test for store: cache-and-serde
Total time: 10.608s
Performance test for store: leveldb
Total time: 13.163s
{noformat}
After:
{noformat}
Performance test for store: cache
Total time: 4.297s
Performance test for store: serde
Total time: 8.069s
Performance test for store: cache-and-serde
Total time: 4.159s
Performance test for store: leveldb
Total time: 8.007s
{noformat}
And it also makes every test run more predictable (rather than steadily
increasing latency, which is what happens without the patch).
> store.all is slow
> -----------------
>
> Key: SAMZA-254
> URL: https://issues.apache.org/jira/browse/SAMZA-254
> Project: Samza
> Issue Type: Bug
> Components: kv
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.7.0, 0.8.0
>
> Attachments: SAMZA-254-0.patch
>
>
> A common usage pattern with Samza stores is to periodically clear out the DB.
> Right now, we call store.all(), iterate over every key, and delete it.
> While using this style of processing, we noticed that the store.all call gets
> increasingly slow as we delete more keys from it:
> This code runs in 5 seconds:
> {code}
> @Test
> def testPerformance {
> System.out.println("Performance test for store: %s" format typeOfStore)
> val numLoops = 100
> val messagesPerBatch = 10000
> val stuff = b((0 until 200).map(i => "a").mkString)
> val start = System.currentTimeMillis
> (0 until numLoops).foreach(i => {
> System.out.println("(%sms) Total in store: %s" format
> (System.currentTimeMillis - start, i * messagesPerBatch))
> (0 until messagesPerBatch).foreach(j => {
> val k = b((i * j).toString)
> store.put(k, stuff)
> // store.delete(k)
> })
>
> val allStart = System.currentTimeMillis
> val iter = store.all
> System.out.println("(%s) all() took %sms." format
> (System.currentTimeMillis - start, System.currentTimeMillis - allStart))
> iter.close
> })
> System.out.println("Total time: %ss" format ((System.currentTimeMillis -
> start) * .001))
> }
> {code}
> Prints:
> {noformat}
> Performance test for store: cache
> (2ms) Total in store: 0
> (627) all() took 5ms.
> (628ms) Total in store: 10000
> (767) all() took 0ms.
> (767ms) Total in store: 20000
> (848) all() took 0ms.
> (848ms) Total in store: 30000
> (915) all() took 0ms.
> (916ms) Total in store: 40000
> (982) all() took 0ms.
> (982ms) Total in store: 50000
> (1046) all() took 0ms.
> (1046ms) Total in store: 60000
> (1100) all() took 0ms.
> (1101ms) Total in store: 70000
> ....
> (5360ms) Total in store: 940000
> (5406) all() took 1ms.
> (5406ms) Total in store: 950000
> (5447) all() took 0ms.
> (5447ms) Total in store: 960000
> (5499) all() took 0ms.
> (5499ms) Total in store: 970000
> (5558) all() took 0ms.
> (5558ms) Total in store: 980000
> (5605) all() took 0ms.
> (5605ms) Total in store: 990000
> (5654) all() took 1ms.
> Total time: 5.654s
> {noformat}
> Identical code, but with the store.delete enabled prints:
> {noformat}
> Performance test for store: cache
> (2ms) Total in store: 0
> (602) all() took 6ms.
> (602ms) Total in store: 10000
> (744) all() took 1ms.
> (745ms) Total in store: 20000
> (820) all() took 2ms.
> (820ms) Total in store: 30000
> (894) all() took 2ms.
> (895ms) Total in store: 40000
> (952) all() took 3ms.
> (952ms) Total in store: 50000
> (1006) all() took 4ms.
> (1006ms) Total in store: 60000
> (1061) all() took 5ms.
> (1061ms) Total in store: 70000
> (1116) all() took 5ms.
> (1116ms) Total in store: 80000
> ....
> (9450) all() took 50ms.
> (9451ms) Total in store: 910000
> (9548) all() took 53ms.
> (9549ms) Total in store: 920000
> (9650) all() took 56ms.
> (9650ms) Total in store: 930000
> (9757) all() took 60ms.
> (9757ms) Total in store: 940000
> (9865) all() took 62ms.
> (9866ms) Total in store: 950000
> (9977) all() took 64ms.
> (9978ms) Total in store: 960000
> (10093) all() took 68ms.
> (10093ms) Total in store: 970000
> (10211) all() took 70ms.
> (10211ms) Total in store: 980000
> (10346) all() took 74ms.
> (10346ms) Total in store: 990000
> (10472) all() took 76ms.
> Total time: 10.472s
> {noformat}
> The latency is clearly increasing in all() when deletes are added.
> I dug down a bit, and it's showing that this line in LevelDbKeyValueStore is
> the culprit:
> {code}
> iter.seekToFirst()
> {code}
> {noformat}
> (10478ms) Total in store: 970000
> db.iterator() took 0ms.
> iter.seekToFirst() took 98ms.
> (10606) all() took 99ms.
> (10606ms) Total in store: 980000
> db.iterator() took 0ms.
> iter.seekToFirst() took 65ms.
> (10699) all() took 65ms.
> (10699ms) Total in store: 990000
> db.iterator() took 0ms.
> iter.seekToFirst() took 66ms.
> (10794) all() took 66ms.
> {noformat}
> More digging shows that this is a common problem with LevelDB:
> http://code.google.com/p/leveldb/issues/detail?id=77
> So, as I see it, we have a few options:
> # Upgrade to RocksDB
> # Find a new way to clear DBs
> # Trigger compactions to clean up the dead keys in the DB
> (1) is definitely the ultimate goal. If (3) is do-able, though I'd like to
> get it into 0.7.0, since this is a pretty common use case for us. I'm not a
> fan of (2), because this problem also manifests itself when people don't want
> to drop the entire table, but delete significant chunks of it periodically
> (e.g. deleting all expired keys from a join buffer).
--
This message was sent by Atlassian JIRA
(v6.2#6252)