This is what i'm thinking of using as a base so as to reuse work when
changing the structure to use index regions between writes. It makes large
and shallow indexes atm.
import bisect
class Chunk:
def __init__(self, start, end, data, height=0, leaf_count=1, age=0):
self.start = start
self.end = end
self.data = data
self.height = height
self.leaf_count = leaf_count
self.age = age
def __len__(self):
return self.end - self.start
def is_leaf(self):
return self.height == 0
def is_full(self):
return self.leaf_count == (1 << self.height)
def is_power_of_2(self):
v = self.leaf_count
return v & (v - 1) == 0
class Flush(Chunk):
class Entry(Chunk):
def __init__(self, start, end, chunk, path = [], height=0, leaf_count=1):
super().__init__(start, end, chunk, height=height, leaf_count=leaf_count, age=chunk.age)
self.path = list(path)
self.path.append(self.data)
self.single_child_descendents = []
self.child_count = 0
if not chunk.is_leaf() and (leaf_count==1 or height==0):
assert leaf_count==1 and height==0
# calculate leaf_count and height
self.leaf_count = 0
self.height = 1
for entry in self.flush_entries():
self.leaf_count += entry.leaf_count
self.height = max(self.height, entry.height + 1)
self.child_count += 1
if entry.single_child_descendents:
self.single_child_descendents.extend(entry.single_child_descendents)
elif entry.child_count == 1:
self.single_child_descendents.append(entry)
def flush_entries(self):
assert not self.data.is_leaf()
return (
Flush.Entry(max(entry.start, self.start), min(entry.end, self.end), entry.data, self.path)
for entry in self.data.data
if entry.start < self.end and entry.end > self.start
)
def chunk_data(self):
assert self.is_leaf()
return self.data.data[self.start - self.data.start : self.end - self.data.start]
def __init__(self, prev_flush = None):
if prev_flush is not None:
super().__init__(prev_flush.start, prev_flush.end, [], height=1, leaf_count=0, age=prev_flush.age+1)
self.max_height = prev_flush.leaf_count.bit_length()
prev_entry = Flush.Entry(self.start, self.end, prev_flush)
self.add(prev_entry)
else:
super().__init__(None, None, [], height=1, leaf_count=0)
self.max_height = 1
def add(self, *adjacents):
adjacents = list(adjacents)
if self.start is None:
self.start = adjacents[0].start
self.end = adjacents[-1].end
else:
self.start = min(self.start, adjacents[0].start)
self.end = max(self.end, adjacents[-1].end)
# expand adjacents that are too deep [should go after start_idx and end_idx are adjust, to find correct max_height easier]
idx = 0
# could update this algorithm to: accumulate things that aren't leaves
# after a run, calculate their shared max_height
# then shallow them all to that minus one, and we're the new root
# seems like the big update of interest
# change: now bringing up descendents without siblings
while idx < len(adjacents):
entry = adjacents[idx]
if entry.single_child_descendents:
print(f'merging {len(entry.single_child_descendents)} single children')
subadjacents = []
shallow_start = entry.start
for descendent in entry.single_child_descendents:
if shallow_start != descendent.start:
subadjacents.append(Flush.Entry(shallow_start, descendent.start, entry.data))
subadjacents.append(descendent)
shallow_start = descendent.end
if shallow_start != entry.end:
subadjacents.append(Flush.Entry(shallow_start, entry.end, entry.data))
adjacents[idx:idx+1] = subadjacents
#elif entry.height + 1 > self.max_height:
# print(f'expanding a branch with depth {entry.height+1}')
# subadjacents = []
# shallow_start = entry.start
# shallow_end = shallow_start
# for subentry in entry.flush_entries():
# if subentry.height + 2 > self.max_height:
# if shallow_end != shallow_start:
# subadjacents.append(Flush.Entry(shallow_start, shallow_end, entry.data))
# subadjacents.append(subentry)
# shallow_start = subentry.end
# shallow_end = subentry.end
# if shallow_end != shallow_start:
# subadjacents.append(Flush.Entry(shallow_start, shallow_end, entry.data))
# adjacents[idx:idx+1] = subadjacents
else:
idx += 1
# first idx with end >= start
start_idx = bisect.bisect_left([entry.end for entry in self.data], adjacents[0].start)
# first idx with start > end
end_idx = bisect.bisect_right([entry.start for entry in self.data], adjacents[-1].end, start_idx)
replaced = self.data[start_idx:end_idx]
if len(replaced):
if replaced[0].start < adjacents[0].start:
adjacents.insert(
0,
Flush.Entry(
replaced[0].start, adjacents[0].start, replaced[0].data
)
)
if start_idx > 0 and replaced[0].end > adjacents[1].start:
# the trimmed entry may have fewer leaves and itself merge with its neighbor
start_idx -= 1
replaced.insert(0, self.data[start_idx])
adjacents.insert(0, self.data[start_idx])
if replaced[-1].end > adjacents[-1].end:
adjacents.append(
Flush.Entry(
adjacents[-1].end, replaced[-1].end, replaced[-1].data
)
)
if end_idx < len(self.data) and replaced[-1].start < adjacents[-2].end:
# the trimmed entry may have fewer leaves and itself merge with its neighbor
replaced.append(self.data[end_idx])
adjacents.append(self.data[end_idx])
end_idx += 1
for idx, entry in reversed(list(enumerate(adjacents))):
if entry.leaf_count == 0:
# no leaves left in this branch, remove
adjacents.pop(idx)
continue
count = 0
subentry = entry
while count <= 1 and not subentry.data.is_leaf():
# make branches shallower by splicing out roots with only one child
parent_entry = subentry
count = 0
for subentry in parent_entry.flush_entries():
count += 1
if count > 1:
subentry = parent_entry
break
if subentry is not entry:
# some internodes were removed
adjacents[idx] = subentry
idx = len(adjacents) - 1
while idx > 0:
idx -= 1
left_adjacent = adjacents[idx]
right_adjacent = adjacents[idx+1]
# merge writes
if (
left_adjacent.age == self.age and
right_adjacent.age == self.age and
left_adjacent.end == right_adjacent.start
):
left_adjacent.data = Chunk(
left_adjacent.start,
right_adjacent.end,
left_adjacent.chunk_data() + right_adjacent.chunk_data(),
age = self.age
)
left_adjacent.end = right_adjacent.end
adjacents.pop(idx+1)
continue
# merge branches with shared parents
shared_path = [
left_parent for left_parent, right_parent
in zip(left_adjacent.path, right_adjacent.path)
if left_parent is right_parent
]
if len(shared_path) > 0 and left_adjacent.height + len(left_adjacent.path) - len(shared_path) < self.max_height and right_adjacent.height + len(right_adjacent.path) - len(shared_path) < self.max_height:
if left_adjacent.end != right_adjacent.start:
assert left_adjacent.end < right_adjacent.start
between_entry = Flush.Entry(
left_adjacent.end,
right_adjacent.start,
shared_path[-1]
)
if between_entry.leaf_count > 0:
# the shared root contains leaves in between that have been removed
continue
print(f'Merging {len(left_adjacent.path)}:{left_adjacent.height}, {len(right_adjacent.path)}:{right_adjacent.height} -> {len(shared_path)}:{left_adjacent.height + len(left_adjacent.path) - len(shared_path)}')
merged = Flush.Entry(
left_adjacent.start,
right_adjacent.end,
chunk = shared_path[-1],
path = shared_path,
# letting Entry recalculate these is a quick way to handle overlap
#leaf_count = left_adjacent.leaf_count + right_adjacent.leaf_count,
#height = left_adjacent.height + len(left_adjacent.path) - len(shared_path)
)
#assert merged.leaf_count == merged.data.check_leaf_count(merged.start, merged.end)
adjacents[idx:idx+2] = [merged]
# ensure all leaves are balanced trees with appropriate height
idx = 0
while idx < len(adjacents):
entry = adjacents[idx]
if entry.height >= self.max_height or not entry.is_full():
adjacents[idx:idx+1] = entry.flush_entries()
#elif not entry.is_full():
# subadjacents = []
# last_idx = idx
# last_end = entry.start
# count = 0
# for next_idx, subentry in enumerate(entry.flush_entries()):
# subadjacents.append(subentry)
# next_count = count + subentry.leaf_count
# if is_2_power(next_count):
# subadjacents.append(
#
# # - each entry is made of full entries
# # - so we want as many entries together as are still full
# # why is it not doing that already
# # once it is a power of 2 large, it should be treated as full
else:
idx += 1
self.data[start_idx:end_idx] = adjacents
# using Entry to recalculate leaf_count is a quick-to-implement way to handle not double-counting chunks that span trimmed groups
proxy_entry = Flush.Entry(self.data[0].start, self.data[-1].end, self)
self.leaf_count = proxy_entry.leaf_count
self.height = proxy_entry.height
#self.height = max((entry.height for entry in self.data)) + 1
#self.check_leaf_count(self.start, self.end)
assert self.leaf_count > 0
self.max_height = self.leaf_count.bit_length()
#assert self.max_height >= self.height # oops this isn't met yet due to dependency order above, calculations used last value
def write(self, offset, data):
chunk = Chunk(offset, offset + len(data), data, age=self.age)
entry = Flush.Entry(offset, offset + len(data), chunk)
return self.add(entry)
def read(self, start, max_end = float('inf')):
# first idx with end > start
idx = bisect.bisect_right([entry.end for entry in self.data], start)
if idx == len(self.data):
return bytes(4096)
entry = self.data[idx]
if entry.start > start:
end = min(max_end, entry.start)
return bytes(end - start)
end = min(max_end, entry.end)
if entry.data.is_leaf():
datastart = start - entry.data.start
dataend = end - entry.data.start
return entry.data.data[datastart : dataend]
else:
return entry.data.read(start, end)
def check_leaf_count(self, start, end):
leaf_count = 0
height = 1
wrapper = Flush.Entry(start, end, self)
for entry in wrapper.flush_entries():
if type(entry.data) is Flush:
entry_leaf_count = entry.data.check_leaf_count(entry.start, entry.end)
assert entry_leaf_count == entry.leaf_count
leaf_count += entry_leaf_count
else:
leaf_count += entry.data.leaf_count
height = max(height, entry.height + 1)
assert leaf_count == wrapper.leaf_count
assert height == wrapper.height
if start == self.start and end == self.end:
assert leaf_count == self.leaf_count
assert height == self.height # oops not met yet
return leaf_count
def main():
import random
random.seed(0)
SIZE=4096
comparison = bytearray(SIZE)
#import mmap
#comparison = mmap.mmap(-1, SIZE)
store = Flush()
def compare(store, comparison):
offset = 0
while offset < len(comparison):
data = store.read(offset)[:len(comparison) - offset]
assert data == comparison[offset:offset+len(data)]
offset += len(data)
store.check_leaf_count(store.start, store.end)
return True
for flushes in range(1024):
for writes in range(random.randint(1,16)):
start = random.randint(0, SIZE-1)
size = min(SIZE-start, random.randint(1, 1024))
#size = min(SIZE-start, random.randint(1, 128))
#start = len(comparison)
#size = random.randint(1,128)
end = start + size
data = random.getrandbits(size*8).to_bytes(size, 'little')
store.write(start, data)
comparison[start:end] = data
#compare(store, comparison)
#print('OK', len(store.data), 'x', store.height, '/', store.max_height, 'count =', store.leaf_count, 'flushes =', flushes, 'writes =', writes)#, offset)
compare(store, comparison)
print('OK', len(store.data), 'x', store.height, '/', store.max_height, '; count =', store.leaf_count, '; flushes =', flushes, '; full =', store.is_full())#, writes)#, offset)
store = Flush(prev_flush = store)
compare(store, comparison)
if __name__ == '__main__':
main()
#import cProfile
#cProfile.run('main()')