This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 054084fb69e2e55ad8ddd4b858ad5d4902db2d37
Author: Adar Dembo <[email protected]>
AuthorDate: Tue Feb 26 00:56:42 2019 -0800

    experiments: merge iterator optimization tests
    
    Here's a brief exploration into various MergeIterator algorithms, prototyped
    in Python. Only after I was done did I see that there was an existing
    experiment on this same subject in C++ (see merge-test.cc). It's not all
    wasted work though; that experiment didn't include the new "hot/cold" heap
    algorithms, nor did it account for all MergeIterator quirks such as paged
    blocks and lower/upper bounds.
    
    Below are some timing results on a big el7 machine. The "real" input was a
    representative (i.e. mostly compacted) 40GB tablet:
    - NaiveMergeIterator, half-overlapping: 44.7854778767s Counter({'cmp': 
25291510, 'peak_blocks_in_mem': 100})
    - SingleHeapMergeIterator, half-overlapping: 11.020619154s Counter({'cmp': 
10266988, 'peak_blocks_in_mem': 3})
    - DoubleHeapMergeIterator, half-overlapping: 3.72211503983s Counter({'cmp': 
1178497, 'peak_blocks_in_mem': 3})
    - TripleHeapMergeIterator, half-overlapping: 3.52963089943s Counter({'cmp': 
1071682, 'peak_blocks_in_mem': 3})
    - NaiveMergeIterator, non-overlapping: 44.3896560669s Counter({'cmp': 
25958482, 'peak_blocks_in_mem': 100})
    - SingleHeapMergeIterator, non-overlapping: 10.9636461735s Counter({'cmp': 
10598336, 'peak_blocks_in_mem': 1})
    - DoubleHeapMergeIterator, non-overlapping: 2.80402898788s Counter({'cmp': 
4021, 'peak_blocks_in_mem': 1})
    - TripleHeapMergeIterator, non-overlapping: 2.83524298668s Counter({'cmp': 
4021, 'peak_blocks_in_mem': 1})
    - NaiveMergeIterator, overlapping: 80.1467709541s Counter({'cmp': 47662665, 
'peak_blocks_in_mem': 100})
    - SingleHeapMergeIterator, overlapping: 9.61102318764s Counter({'cmp': 
8554237, 'peak_blocks_in_mem': 100})
    - DoubleHeapMergeIterator, overlapping: 9.68881893158s Counter({'cmp': 
8553345, 'peak_blocks_in_mem': 100})
    - TripleHeapMergeIterator, overlapping: 9.55243206024s Counter({'cmp': 
8563292, 'peak_blocks_in_mem': 100})
    - NaiveMergeIterator, real: 1099763.37405s Counter({'cmp': 578660759971, 
'peak_blocks_in_mem': 1294})
    - SingleHeapMergeIterator, real: 30513.3831122s Counter({'cmp': 
30785961774, 'peak_blocks_in_mem': 5})
    - DoubleHeapMergeIterator, real: 7987.11197996s Counter({'cmp': 4173739455, 
'peak_blocks_in_mem': 15})
    - TripleHeapMergeIterator, real: 7155.59520698s Counter({'cmp': 2784969619, 
'peak_blocks_in_mem': 5})
    
    Change-Id: I6ae1d2f9e4f41337f475146c648cbab122395f83
    Reviewed-on: http://gerrit.cloudera.org:8080/12587
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <[email protected]>
---
 src/kudu/experiments/merge-test.py | 519 +++++++++++++++++++++++++++++++++++++
 1 file changed, 519 insertions(+)

diff --git a/src/kudu/experiments/merge-test.py 
b/src/kudu/experiments/merge-test.py
new file mode 100755
index 0000000..feddee3
--- /dev/null
+++ b/src/kudu/experiments/merge-test.py
@@ -0,0 +1,519 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from collections import Counter
+from functools import total_ordering
+import glob
+import gzip
+import heapq
+import io
+import logging
+import random
+import time
+import unittest
+
+# Adjustable experiment parameters.
+BLOCK_SIZE = 1000
+MAX_ITEM = 1000000
+NUM_ITERATORS = 100
+MIN_ITEMS_PER_ITERATOR = 1
+MAX_ITEMS_PER_ITERATOR = 10000
+
+class MyHeap(object):
+  """ Heap with custom key comparator. See 
https://stackoverflow.com/a/8875823. """
+  def __init__(self, initial=None, key=lambda x:x):
+    self.key = key
+    if initial:
+      self._data = [(key(item), item) for item in initial]
+      heapq.heapify(self._data)
+    else:
+      self._data = []
+
+  def push(self, item):
+    heapq.heappush(self._data, (self.key(item), item))
+
+  def pop(self):
+    return heapq.heappop(self._data)[1]
+
+  def top(self):
+    return self._data[0][1]
+
+  def __len__(self):
+    return len(self._data)
+
+  def __str__(self):
+    return ', '.join(str(e[0]) for e in self._data)
+
+class BlockIterator(object):
+  """ Iterator of generic items. Returns items on a block basis. """
+  next_block_idx = 0
+
+  def __init__(self, items):
+    assert len(items) > 0
+    self.items = items
+    self.lower_bound = self.items[0]
+    self.upper_bound = self.items[-1]
+
+  def next_block(self):
+    """ Return an array containing the next block of items """
+    block = self.items[self.next_block_idx:self.next_block_idx + BLOCK_SIZE]
+    assert len(block) > 0
+    self.next_block_idx += len(block)
+    return block
+
+  def has_next(self):
+    return len(self.items) > 0 and self.next_block_idx < len(self.items)
+
+  def __str__(self):
+    return ", ".join(str(i) for i in self.items[self.next_block_idx:])
+
+class FileBlockIterator(object):
+  """
+  Iterator of generic items loaded from an (optionally gzipped) file. Returns
+  items on a block basis.
+
+  When constructed for a file with name 'foo', expects to find a file with name
+  'foo.firstlast' containing two lines: the first and last items in 'foo'.
+  """
+
+  def __init__(self, filename, stats):
+    self.stats = stats
+    self.filename = filename
+
+    if self.filename.endswith(".gz"):
+      self.fh = gzip.open(self.filename, "rb")
+    else:
+      self.fh = open(self.filename, "rb")
+    self.br = io.BufferedReader(self.fh)
+
+    with open(self.filename + ".firstlast", "rb") as flfh:
+      l = flfh.readline()[:-1]
+      assert len(l) > 0
+      self.lower_bound = ComparisonCountingObject(l, self.stats)
+      l = flfh.readline()[:-1]
+      assert len(l) > 0
+      self.upper_bound = ComparisonCountingObject(l, self.stats)
+
+    self.next_line = self.br.readline()
+
+  def next_block(self):
+    """ Return an array containing the next block of items """
+    assert self.has_next()
+    block = []
+    while True:
+      l = self.next()
+      block.append(ComparisonCountingObject(l, self.stats))
+      if not self.has_next():
+        # EOF
+        break
+      if len(block) == BLOCK_SIZE:
+        break
+    return block
+
+  def next(self):
+    assert self.has_next()
+    l = self.next_line
+    self.next_line = self.br.readline()
+    return l[:-1]
+
+  def has_next(self):
+    return len(self.next_line) > 0
+
+  def __str__(self):
+    return self.filename
+
+class PagingBlockIterator(object):
+  """
+  Iterator-like object that pages an entire block of items into memory and
+  provides finer-grained access to the items.
+  """
+
+  def __init__(self, block_iter, stats):
+    """
+    block_iter: BlockIterator
+    stats: collections.Counter for keeping track of perf stats
+    """
+    self.block_iter = block_iter
+    self.stats = stats
+    self.paged_block = None
+    self.paged_block_idx = -1
+    self.paged_one_block = False
+
+  def page_next_block(self):
+    # Block must be fully consumed before paging the next one.
+    assert not self.paged_block or len(self.paged_block) == 
self.paged_block_idx + 1
+
+    if self.block_iter.has_next():
+      next_block = self.block_iter.next_block()
+      assert len(next_block) > 0
+
+      if self.paged_block is None:
+        self.stats['blocks_in_mem'] += 1
+        self.stats['peak_blocks_in_mem'] = 
max(self.stats['peak_blocks_in_mem'],
+                                               self.stats['blocks_in_mem'])
+
+      self.paged_block = next_block
+      self.paged_block_idx = 0
+      self.paged_one_block = True
+    else:
+      if self.paged_block is not None:
+        self.stats['blocks_in_mem'] -= 1
+      self.paged_block = None
+
+  def advance(self):
+    assert self.paged_block
+
+    item = self.paged_block[self.paged_block_idx]
+    if len(self.paged_block) == self.paged_block_idx + 1:
+      self.page_next_block()
+      return True
+    else:
+      self.paged_block_idx += 1
+      return False
+
+  def has_next(self):
+    return self.paged_block or self.block_iter.has_next()
+
+  def has_ever_paged(self):
+    return self.paged_one_block
+
+  def min(self):
+    return (self.paged_block[0]
+            if self.has_ever_paged() else self.block_iter.lower_bound)
+
+  def max(self):
+    return (self.paged_block[-1]
+            if self.has_ever_paged() else self.block_iter.upper_bound)
+
+  def cur(self):
+    return (self.paged_block[self.paged_block_idx]
+            if self.has_ever_paged() else self.block_iter.lower_bound)
+
+  def __str__(self):
+    s = ""
+    if self.paged_block:
+      s += ", ".join(str(i) for i in self.paged_block[self.paged_block_idx:])
+    suffix = str(self.block_iter)
+    if suffix:
+      s += ", "
+      s += suffix
+    return s
+
+def remove_dead_iters(iters):
+  """
+  Convenience method to filter out any fully-consumed iterators.
+  """
+  live_iters = []
+  for i in iters:
+    if i.has_next():
+      live_iters.append(i)
+  return live_iters
+
+class NaiveMergeIterator(object):
+  """
+  Simple merge iterator that uses no optimizations whatsoever. Every call to
+  next() iterates over all live iterators and returns the smallest item.
+  """
+  def __init__(self, iters):
+    self.iters = remove_dead_iters(iters)
+
+    # This iterator ignores bounds, so we must page in all blocks up front.
+    [i.page_next_block() for i in self.iters]
+
+  def next(self):
+    smallest_iter = None
+    for i in self.iters:
+      if not smallest_iter or i.cur() < smallest_iter.cur():
+        smallest_iter = i
+    assert smallest_iter
+    item = smallest_iter.cur()
+    smallest_iter.advance()
+    self.iters = remove_dead_iters(self.iters)
+    return item
+
+  def has_next(self):
+    return len(self.iters) > 0
+
+class SingleHeapMergeIterator(object):
+  """
+  More sophisticated merge iterator that uses a heap to optimize next() calls.
+
+  Initially, the underlying iterators' bounds are used to establish heap order.
+  When an iterator is next (i.e. when its first item is the global minimum), 
the
+  first block is paged in and the iterator is put back in the heap. In the
+  steady state, blocks are paged in as iterators are advanced and the heap is
+  reordered with every call to next().
+  """
+  def __init__(self, iters):
+    self.iters = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur())
+
+  def next(self):
+    smallest_iter = None
+    while True:
+      smallest_iter = self.iters.pop()
+      if not smallest_iter.has_ever_paged():
+        # Page in the first block and retry.
+        smallest_iter.page_next_block()
+        self.iters.push(smallest_iter)
+        continue
+      break
+
+    item = smallest_iter.cur()
+    smallest_iter.advance()
+    if smallest_iter.has_next():
+      self.iters.push(smallest_iter)
+    return item
+
+  def has_next(self):
+    return len(self.iters) > 0
+
+class DoubleHeapMergeIterator(object):
+  """
+  Hot/cold heap-based merge iterator.
+
+  This variant assigns iterators to two heaps. The "hot" heap includes all
+  iterators currently needed to perform the merge, while the "cold" heap
+  contains the rest.
+
+  While algorithmically equivalent to the basic heap-based merge iterator, the
+  amount of heap reordering is typically less due to the reduced size of the
+  working set (i.e. the size of the hot heap). This is especially true when the
+  input iterators do not overlap, as that allows the algorithm to maximize the
+  size of the cold heap.
+  """
+  def __init__(self, iters):
+    self.cold = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur())
+    self.hot = MyHeap([], key=lambda x : x.cur())
+    self._refill_hot()
+
+  def _refill_hot(self):
+    while len(self.cold) > 0 and (len(self.hot) == 0 or
+                                  self.hot.top().max() >= 
self.cold.top().cur()):
+      warmest = self.cold.pop()
+      if not warmest.has_ever_paged():
+        # Page in the first block and retry.
+        warmest.page_next_block()
+        self.cold.push(warmest)
+        continue
+      self.hot.push(warmest)
+
+  def next(self):
+    smallest_iter = self.hot.pop()
+
+    item = smallest_iter.cur()
+    paged_new_block = smallest_iter.advance()
+    is_dead = not smallest_iter.has_next()
+
+    if is_dead:
+      self._refill_hot()
+    elif paged_new_block:
+      if len(self.hot) > 0 and self.hot.top().max() < smallest_iter.cur():
+        # 'smallest_iter' is no longer in the merge window.
+        self.cold.push(smallest_iter)
+      else:
+        self.hot.push(smallest_iter)
+      self._refill_hot()
+    else:
+      self.hot.push(smallest_iter)
+    return item
+
+  def has_next(self):
+    return len(self.hot) > 0 or len(self.cold) > 0
+
+class TripleHeapMergeIterator(object):
+  """
+  Advanced hot/cold heap-based merge iterator.
+
+  Like DoubleHeapMergeIterator but uses an additional heap (of the result of
+  max() in each iterator found in "hot") to more accurately track the top end
+  of the merge window. The result is an even smaller hot heap.
+  """
+  def __init__(self, iters):
+    self.cold = MyHeap(remove_dead_iters(iters), key=lambda x : x.cur())
+    self.hot = MyHeap([], key=lambda x : x.cur())
+    self.hotmaxes = MyHeap([])
+    self._refill_hot()
+
+  def _refill_hot(self):
+    while len(self.cold) > 0 and (len(self.hotmaxes) == 0 or
+                                  self.hotmaxes.top() >= 
self.cold.top().cur()):
+      warmest = self.cold.pop()
+      if not warmest.has_ever_paged():
+        # Page in the first block and retry.
+        warmest.page_next_block()
+        self.cold.push(warmest)
+        continue
+      self.hot.push(warmest)
+      self.hotmaxes.push(warmest.max())
+
+  def next(self):
+    smallest_iter = self.hot.pop()
+    # Defer pop of hotmaxes; it only needs to happen if we've finished a block.
+
+    item = smallest_iter.cur()
+    paged_new_block = smallest_iter.advance()
+    is_dead = not smallest_iter.has_next()
+
+    if is_dead:
+      self.hotmaxes.pop()
+      self._refill_hot()
+    elif paged_new_block:
+      self.hotmaxes.pop()
+      if len(self.hotmaxes) > 0 and self.hotmaxes.top() < smallest_iter.cur():
+        # 'smallest_iter' is no longer in the merge window.
+        self.cold.push(smallest_iter)
+      else:
+        self.hot.push(smallest_iter)
+        self.hotmaxes.push(smallest_iter.max())
+      self._refill_hot()
+    else:
+      self.hot.push(smallest_iter)
+    return item
+
+  def has_next(self):
+    return len(self.hot) > 0 or len(self.cold) > 0
+
+@total_ordering
+class ComparisonCountingObject(object):
+  def __init__(self, val, stats):
+    self.val = val
+    self.stats = stats
+
+  def __eq__(self, rhs):
+    assert isinstance(rhs, ComparisonCountingObject)
+    self.stats['cmp'] += 1
+    return self.val.__eq__(rhs.val)
+
+  def __lt__(self, rhs):
+    assert isinstance(rhs, ComparisonCountingObject)
+    self.stats['cmp'] += 1
+    return self.val.__lt__(rhs.val)
+
+  def __str__(self):
+    return str(self.val)
+
+class ComparisonCountingInt(object):
+  def __init__(self, val, stats):
+    self.val = val
+    self.stats = stats
+
+  def __cmp__(self, rhs):
+    assert isinstance(rhs, ComparisonCountingInt)
+    self.stats['cmp'] += 1
+    return self.val.__cmp__(rhs.val)
+
+  def __str__(self):
+    return str(self.val)
+
+class TestMerges(unittest.TestCase):
+  maxDiff = 1e9
+  def generate_input(self, pattern):
+    lists_of_items = []
+    expected_items = []
+    last_item = 0
+    for i in xrange(NUM_ITERATORS):
+      if pattern == 'overlapping':
+        min_item = 0
+      elif pattern == 'non-overlapping':
+        min_item = last_item
+      elif pattern == 'half-overlapping':
+        min_item = last_item - MAX_ITEM / 2
+      num_items = random.randint(MIN_ITEMS_PER_ITERATOR, 
MAX_ITEMS_PER_ITERATOR)
+      items = random.sample(xrange(min_item, MAX_ITEM + min_item), num_items)
+      items.sort()
+      lists_of_items.append(items)
+      expected_items.extend(items)
+      last_item = items[-1]
+    expected_items.sort()
+    return (lists_of_items, expected_items)
+
+  def run_merge(self, merge_type, pattern, lists_of_items, list_of_files, 
expected_results):
+    stats = Counter()
+    start = time.time()
+    if lists_of_items:
+      iters = [PagingBlockIterator(BlockIterator([ComparisonCountingInt(i, 
stats) for i in l]),
+                                   stats) for l in lists_of_items]
+    else:
+      assert list_of_files
+      iters = [PagingBlockIterator(FileBlockIterator(f, stats),
+                                   stats) for f in list_of_files]
+    logging.info("Starting merge with {}".format(merge_type.__name__))
+    merge_iter = merge_type(iters)
+    logging.info("Initialized iterator")
+    results = []
+    num_results = 0
+    t1 = start
+    while merge_iter.has_next():
+      n = merge_iter.next().val
+      num_results += 1
+      if expected_results:
+        results.append(n)
+
+      t2 = time.time()
+      if t2 - t1 > 10:
+        logging.info("Merged {} elements ({} eps) {}".format(
+          num_results,
+          num_results / (t2 - start),
+          repr(stats)))
+        t1 = t2
+    elapsed = time.time() - start
+    logging.info("Merged {} elements".format(num_results))
+    if expected_results:
+      self.assertEqual(expected_results, results)
+    logging.info("{} with {} input: {}s {}".format(
+      merge_type.__name__,
+      pattern,
+      elapsed,
+      repr(stats)))
+
+  def _do_test(self, pattern):
+    lists_of_items, expected_items = self.generate_input(pattern)
+
+    # Commented out because it's too slow with the current parameters.
+    #
+    # self.run_merge(NaiveMergeIterator, pattern, lists_of_items, None, 
expected_items)
+    self.run_merge(SingleHeapMergeIterator, pattern, lists_of_items, None, 
expected_items)
+    self.run_merge(DoubleHeapMergeIterator, pattern, lists_of_items, None, 
expected_items)
+    self.run_merge(TripleHeapMergeIterator, pattern, lists_of_items, None, 
expected_items)
+
+  def test_overlapping_input(self):
+    self._do_test('overlapping')
+
+  def test_nonoverlapping_input(self):
+    self._do_test('non-overlapping')
+
+  def test_half_overlapping_input(self):
+    self._do_test('half-overlapping')
+
+  def test_real_input(self):
+    list_of_files = glob.glob("rowset_keys_*.gz")
+    if len(list_of_files) == 0:
+      self.skipTest("No real input found")
+
+    # Commented out because it's too slow with the current parameters.
+    #
+    # self.run_merge(NaiveMergeIterator, "real", None, list_of_files, None)
+    self.run_merge(SingleHeapMergeIterator, "real", None, list_of_files, None)
+    self.run_merge(DoubleHeapMergeIterator, "real", None, list_of_files, None)
+    self.run_merge(TripleHeapMergeIterator, "real", None, list_of_files, None)
+
+if __name__ == "__main__":
+  logging.basicConfig(level=logging.INFO,
+                      format='%(asctime)s %(levelname)s: %(message)s')
+  unittest.main()

Reply via email to