This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 054084fb69e2e55ad8ddd4b858ad5d4902db2d37 Author: Adar Dembo <[email protected]> AuthorDate: Tue Feb 26 00:56:42 2019 -0800 experiments: merge iterator optimization tests Here's a brief exploration into various MergeIterator algorithms, prototyped in Python. Only after I was done did I see that there was an existing experiment on this same subject in C++ (see merge-test.cc). It's not all wasted work though; that experiment didn't include the new "hot/cold" heap algorithms, nor did it account for all MergeIterator quirks such as paged blocks and lower/upper bounds. Below are some timing results on a big el7 machine. The "real" input was a representative (i.e. mostly compacted) 40GB tablet: - NaiveMergeIterator, half-overlapping: 44.7854778767s Counter({'cmp': 25291510, 'peak_blocks_in_mem': 100}) - SingleHeapMergeIterator, half-overlapping: 11.020619154s Counter({'cmp': 10266988, 'peak_blocks_in_mem': 3}) - DoubleHeapMergeIterator, half-overlapping: 3.72211503983s Counter({'cmp': 1178497, 'peak_blocks_in_mem': 3}) - TripleHeapMergeIterator, half-overlapping: 3.52963089943s Counter({'cmp': 1071682, 'peak_blocks_in_mem': 3}) - NaiveMergeIterator, non-overlapping: 44.3896560669s Counter({'cmp': 25958482, 'peak_blocks_in_mem': 100}) - SingleHeapMergeIterator, non-overlapping: 10.9636461735s Counter({'cmp': 10598336, 'peak_blocks_in_mem': 1}) - DoubleHeapMergeIterator, non-overlapping: 2.80402898788s Counter({'cmp': 4021, 'peak_blocks_in_mem': 1}) - TripleHeapMergeIterator, non-overlapping: 2.83524298668s Counter({'cmp': 4021, 'peak_blocks_in_mem': 1}) - NaiveMergeIterator, overlapping: 80.1467709541s Counter({'cmp': 47662665, 'peak_blocks_in_mem': 100}) - SingleHeapMergeIterator, overlapping: 9.61102318764s Counter({'cmp': 8554237, 'peak_blocks_in_mem': 100}) - DoubleHeapMergeIterator, overlapping: 9.68881893158s Counter({'cmp': 8553345, 'peak_blocks_in_mem': 100}) - TripleHeapMergeIterator, overlapping: 9.55243206024s Counter({'cmp': 8563292, 'peak_blocks_in_mem': 100}) - NaiveMergeIterator, real: 1099763.37405s Counter({'cmp': 578660759971, 'peak_blocks_in_mem': 1294}) - SingleHeapMergeIterator, real: 30513.3831122s Counter({'cmp': 30785961774, 'peak_blocks_in_mem': 5}) - DoubleHeapMergeIterator, real: 7987.11197996s Counter({'cmp': 4173739455, 'peak_blocks_in_mem': 15}) - TripleHeapMergeIterator, real: 7155.59520698s Counter({'cmp': 2784969619, 'peak_blocks_in_mem': 5}) Change-Id: I6ae1d2f9e4f41337f475146c648cbab122395f83 Reviewed-on: http://gerrit.cloudera.org:8080/12587 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <[email protected]> --- src/kudu/experiments/merge-test.py | 519 +++++++++++++++++++++++++++++++++++++ 1 file changed, 519 insertions(+) diff --git a/src/kudu/experiments/merge-test.py b/src/kudu/experiments/merge-test.py new file mode 100755 index 0000000..feddee3 --- /dev/null +++ b/src/kudu/experiments/merge-test.py @@ -0,0 +1,519 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from collections import Counter +from functools import total_ordering +import glob +import gzip +import heapq +import io +import logging +import random +import time +import unittest + +# Adjustable experiment parameters. +BLOCK_SIZE = 1000 +MAX_ITEM = 1000000 +NUM_ITERATORS = 100 +MIN_ITEMS_PER_ITERATOR = 1 +MAX_ITEMS_PER_ITERATOR = 10000 + +class MyHeap(object): + """ Heap with custom key comparator. See https://stackoverflow.com/a/8875823. """ + def __init__(self, initial=None, key=lambda x:x): + self.key = key + if initial: + self._data = [(key(item), item) for item in initial] + heapq.heapify(self._data) + else: + self._data = [] + + def push(self, item): + heapq.heappush(self._data, (self.key(item), item)) + + def pop(self): + return heapq.heappop(self._data)[1] + + def top(self): + return self._data[0][1] + + def __len__(self): + return len(self._data) + + def __str__(self): + return ', '.join(str(e[0]) for e in self._data) + +class BlockIterator(object): + """ Iterator of generic items. Returns items on a block basis. """ + next_block_idx = 0 + + def __init__(self, items): + assert len(items) > 0 + self.items = items + self.lower_bound = self.items[0] + self.upper_bound = self.items[-1] + + def next_block(self): + """ Return an array containing the next block of items """ + block = self.items[self.next_block_idx:self.next_block_idx + BLOCK_SIZE] + assert len(block) > 0 + self.next_block_idx += len(block) + return block + + def has_next(self): + return len(self.items) > 0 and self.next_block_idx < len(self.items) + + def __str__(self): + return ", ".join(str(i) for i in self.items[self.next_block_idx:]) + +class FileBlockIterator(object): + """ + Iterator of generic items loaded from an (optionally gzipped) file. Returns + items on a block basis. + + When constructed for a file with name 'foo', expects to find a file with name + 'foo.firstlast' containing two lines: the first and last items in 'foo'. + """ + + def __init__(self, filename, stats): + self.stats = stats + self.filename = filename + + if self.filename.endswith(".gz"): + self.fh = gzip.open(self.filename, "rb") + else: + self.fh = open(self.filename, "rb") + self.br = io.BufferedReader(self.fh) + + with open(self.filename + ".firstlast", "rb") as flfh: + l = flfh.readline()[:-1] + assert len(l) > 0 + self.lower_bound = ComparisonCountingObject(l, self.stats) + l = flfh.readline()[:-1] + assert len(l) > 0 + self.upper_bound = ComparisonCountingObject(l, self.stats) + + self.next_line = self.br.readline() + + def next_block(self): + """ Return an array containing the next block of items """ + assert self.has_next() + block = [] + while True: + l = self.next() + block.append(ComparisonCountingObject(l, self.stats)) + if not self.has_next(): + # EOF + break + if len(block) == BLOCK_SIZE: + break + return block + + def next(self): + assert self.has_next() + l = self.next_line + self.next_line = self.br.readline() + return l[:-1] + + def has_next(self): + return len(self.next_line) > 0 + + def __str__(self): + return self.filename + +class PagingBlockIterator(object): + """ + Iterator-like object that pages an entire block of items into memory and + provides finer-grained access to the items. + """ + + def __init__(self, block_iter, stats): + """ + block_iter: BlockIterator + stats: collections.Counter for keeping track of perf stats + """ + self.block_iter = block_iter + self.stats = stats + self.paged_block = None + self.paged_block_idx = -1 + self.paged_one_block = False + + def page_next_block(self): + # Block must be fully consumed before paging the next one. + assert not self.paged_block or len(self.paged_block) == self.paged_block_idx + 1 + + if self.block_iter.has_next(): + next_block = self.block_iter.next_block() + assert len(next_block) > 0 + + if self.paged_block is None: + self.stats['blocks_in_mem'] += 1 + self.stats['peak_blocks_in_mem'] = max(self.stats['peak_blocks_in_mem'], + self.stats['blocks_in_mem']) + + self.paged_block = next_block + self.paged_block_idx = 0 + self.paged_one_block = True + else: + if self.paged_block is not None: + self.stats['blocks_in_mem'] -= 1 + self.paged_block = None + + def advance(self): + assert self.paged_block + + item = self.paged_block[self.paged_block_idx] + if len(self.paged_block) == self.paged_block_idx + 1: + self.page_next_block() + return True + else: + self.paged_block_idx += 1 + return False + + def has_next(self): + return self.paged_block or self.block_iter.has_next() + + def has_ever_paged(self): + return self.paged_one_block + + def min(self): + return (self.paged_block[0] + if self.has_ever_paged() else self.block_iter.lower_bound) + + def max(self): + return (self.paged_block[-1] + if self.has_ever_paged() else self.block_iter.upper_bound) + + def cur(self): + return (self.paged_block[self.paged_block_idx] + if self.has_ever_paged() else self.block_iter.lower_bound) + + def __str__(self): + s = "" + if self.paged_block: + s += ", ".join(str(i) for i in self.paged_block[self.paged_block_idx:]) + suffix = str(self.block_iter) + if suffix: + s += ", " + s += suffix + return s + +def remove_dead_iters(iters): + """ + Convenience method to filter out any fully-consumed iterators. + """ + live_iters = [] + for i in iters: + if i.has_next(): + live_iters.append(i) + return live_iters + +class NaiveMergeIterator(object): + """ + Simple merge iterator that uses no optimizations whatsoever. Every call to + next() iterates over all live iterators and returns the smallest item. + """ + def __init__(self, iters): + self.iters = remove_dead_iters(iters) + + # This iterator ignores bounds, so we must page in all blocks up front. + [i.page_next_block() for i in self.iters] + + def next(self): + smallest_iter = None + for i in self.iters: + if not smallest_iter or i.cur() < smallest_iter.cur(): + smallest_iter = i + assert smallest_iter + item = smallest_iter.cur() + smallest_iter.advance() + self.iters = remove_dead_iters(self.iters) + return item + + def has_next(self): + return len(self.iters) > 0 + +class SingleHeapMergeIterator(object): + """ + More sophisticated merge iterator that uses a heap to optimize next() calls. + + Initially, the underlying iterators' bounds are used to establish heap order. + When an iterator is next (i.e. when its first item is the global minimum), the + first block is paged in and the iterator is put back in the heap. In the + steady state, blocks are paged in as iterators are advanced and the heap is + reordered with every call to next(). + """ + def __init__(self, iters): + self.iters = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur()) + + def next(self): + smallest_iter = None + while True: + smallest_iter = self.iters.pop() + if not smallest_iter.has_ever_paged(): + # Page in the first block and retry. + smallest_iter.page_next_block() + self.iters.push(smallest_iter) + continue + break + + item = smallest_iter.cur() + smallest_iter.advance() + if smallest_iter.has_next(): + self.iters.push(smallest_iter) + return item + + def has_next(self): + return len(self.iters) > 0 + +class DoubleHeapMergeIterator(object): + """ + Hot/cold heap-based merge iterator. + + This variant assigns iterators to two heaps. The "hot" heap includes all + iterators currently needed to perform the merge, while the "cold" heap + contains the rest. + + While algorithmically equivalent to the basic heap-based merge iterator, the + amount of heap reordering is typically less due to the reduced size of the + working set (i.e. the size of the hot heap). This is especially true when the + input iterators do not overlap, as that allows the algorithm to maximize the + size of the cold heap. + """ + def __init__(self, iters): + self.cold = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur()) + self.hot = MyHeap([], key=lambda x : x.cur()) + self._refill_hot() + + def _refill_hot(self): + while len(self.cold) > 0 and (len(self.hot) == 0 or + self.hot.top().max() >= self.cold.top().cur()): + warmest = self.cold.pop() + if not warmest.has_ever_paged(): + # Page in the first block and retry. + warmest.page_next_block() + self.cold.push(warmest) + continue + self.hot.push(warmest) + + def next(self): + smallest_iter = self.hot.pop() + + item = smallest_iter.cur() + paged_new_block = smallest_iter.advance() + is_dead = not smallest_iter.has_next() + + if is_dead: + self._refill_hot() + elif paged_new_block: + if len(self.hot) > 0 and self.hot.top().max() < smallest_iter.cur(): + # 'smallest_iter' is no longer in the merge window. + self.cold.push(smallest_iter) + else: + self.hot.push(smallest_iter) + self._refill_hot() + else: + self.hot.push(smallest_iter) + return item + + def has_next(self): + return len(self.hot) > 0 or len(self.cold) > 0 + +class TripleHeapMergeIterator(object): + """ + Advanced hot/cold heap-based merge iterator. + + Like DoubleHeapMergeIterator but uses an additional heap (of the result of + max() in each iterator found in "hot") to more accurately track the top end + of the merge window. The result is an even smaller hot heap. + """ + def __init__(self, iters): + self.cold = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur()) + self.hot = MyHeap([], key=lambda x : x.cur()) + self.hotmaxes = MyHeap([]) + self._refill_hot() + + def _refill_hot(self): + while len(self.cold) > 0 and (len(self.hotmaxes) == 0 or + self.hotmaxes.top() >= self.cold.top().cur()): + warmest = self.cold.pop() + if not warmest.has_ever_paged(): + # Page in the first block and retry. + warmest.page_next_block() + self.cold.push(warmest) + continue + self.hot.push(warmest) + self.hotmaxes.push(warmest.max()) + + def next(self): + smallest_iter = self.hot.pop() + # Defer pop of hotmaxes; it only needs to happen if we've finished a block. + + item = smallest_iter.cur() + paged_new_block = smallest_iter.advance() + is_dead = not smallest_iter.has_next() + + if is_dead: + self.hotmaxes.pop() + self._refill_hot() + elif paged_new_block: + self.hotmaxes.pop() + if len(self.hotmaxes) > 0 and self.hotmaxes.top() < smallest_iter.cur(): + # 'smallest_iter' is no longer in the merge window. + self.cold.push(smallest_iter) + else: + self.hot.push(smallest_iter) + self.hotmaxes.push(smallest_iter.max()) + self._refill_hot() + else: + self.hot.push(smallest_iter) + return item + + def has_next(self): + return len(self.hot) > 0 or len(self.cold) > 0 + +@total_ordering +class ComparisonCountingObject(object): + def __init__(self, val, stats): + self.val = val + self.stats = stats + + def __eq__(self, rhs): + assert isinstance(rhs, ComparisonCountingObject) + self.stats['cmp'] += 1 + return self.val.__eq__(rhs.val) + + def __lt__(self, rhs): + assert isinstance(rhs, ComparisonCountingObject) + self.stats['cmp'] += 1 + return self.val.__lt__(rhs.val) + + def __str__(self): + return str(self.val) + +class ComparisonCountingInt(object): + def __init__(self, val, stats): + self.val = val + self.stats = stats + + def __cmp__(self, rhs): + assert isinstance(rhs, ComparisonCountingInt) + self.stats['cmp'] += 1 + return self.val.__cmp__(rhs.val) + + def __str__(self): + return str(self.val) + +class TestMerges(unittest.TestCase): + maxDiff = 1e9 + def generate_input(self, pattern): + lists_of_items = [] + expected_items = [] + last_item = 0 + for i in xrange(NUM_ITERATORS): + if pattern == 'overlapping': + min_item = 0 + elif pattern == 'non-overlapping': + min_item = last_item + elif pattern == 'half-overlapping': + min_item = last_item - MAX_ITEM / 2 + num_items = random.randint(MIN_ITEMS_PER_ITERATOR, MAX_ITEMS_PER_ITERATOR) + items = random.sample(xrange(min_item, MAX_ITEM + min_item), num_items) + items.sort() + lists_of_items.append(items) + expected_items.extend(items) + last_item = items[-1] + expected_items.sort() + return (lists_of_items, expected_items) + + def run_merge(self, merge_type, pattern, lists_of_items, list_of_files, expected_results): + stats = Counter() + start = time.time() + if lists_of_items: + iters = [PagingBlockIterator(BlockIterator([ComparisonCountingInt(i, stats) for i in l]), + stats) for l in lists_of_items] + else: + assert list_of_files + iters = [PagingBlockIterator(FileBlockIterator(f, stats), + stats) for f in list_of_files] + logging.info("Starting merge with {}".format(merge_type.__name__)) + merge_iter = merge_type(iters) + logging.info("Initialized iterator") + results = [] + num_results = 0 + t1 = start + while merge_iter.has_next(): + n = merge_iter.next().val + num_results += 1 + if expected_results: + results.append(n) + + t2 = time.time() + if t2 - t1 > 10: + logging.info("Merged {} elements ({} eps) {}".format( + num_results, + num_results / (t2 - start), + repr(stats))) + t1 = t2 + elapsed = time.time() - start + logging.info("Merged {} elements".format(num_results)) + if expected_results: + self.assertEqual(expected_results, results) + logging.info("{} with {} input: {}s {}".format( + merge_type.__name__, + pattern, + elapsed, + repr(stats))) + + def _do_test(self, pattern): + lists_of_items, expected_items = self.generate_input(pattern) + + # Commented out because it's too slow with the current parameters. + # + # self.run_merge(NaiveMergeIterator, pattern, lists_of_items, None, expected_items) + self.run_merge(SingleHeapMergeIterator, pattern, lists_of_items, None, expected_items) + self.run_merge(DoubleHeapMergeIterator, pattern, lists_of_items, None, expected_items) + self.run_merge(TripleHeapMergeIterator, pattern, lists_of_items, None, expected_items) + + def test_overlapping_input(self): + self._do_test('overlapping') + + def test_nonoverlapping_input(self): + self._do_test('non-overlapping') + + def test_half_overlapping_input(self): + self._do_test('half-overlapping') + + def test_real_input(self): + list_of_files = glob.glob("rowset_keys_*.gz") + if len(list_of_files) == 0: + self.skipTest("No real input found") + + # Commented out because it's too slow with the current parameters. + # + # self.run_merge(NaiveMergeIterator, "real", None, list_of_files, None) + self.run_merge(SingleHeapMergeIterator, "real", None, list_of_files, None) + self.run_merge(DoubleHeapMergeIterator, "real", None, list_of_files, None) + self.run_merge(TripleHeapMergeIterator, "real", None, list_of_files, None) + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(levelname)s: %(message)s') + unittest.main()
