works now
I think there's a little redundancy in use of the Chunk structure not
certain
would make sense to add bisection to access spots in the sorted lists
rather than linear iteration
the number of children per node is staying about equal to the tree depth,
1024 flushes of 1-32 lengthy writes to a 4096 byte region
# block tree structure
# blocks written to are leaves
# depth is log2 leaves
# when a flush is made, all blocks are written, and also enough nodes such that every leaf can be accessed within depth lookups.
# consider we have an existing tree
# with say m flushes, containing n leaves (or m leaves). we'll likely call it n.
# each flush shows which leaves it has
# additionally, with the final flush, each leaf has an existing depth.
# when we reflush, we need to provide a new index for any leaves that become too deep.
# which leaves are too deep?
# we could basically walk them all to find out. this would be a consistent first approach.
import math
class Simple:
class Chunk:
def __init__(self, offset, data, end=None):
self.start = offset
self.data = data
self.end = self.start + len(self.data) if end is None else end
class Flush:
# flush has a list of new leaves, and a list of indexes to old leaves with ranges
def __init__(self, *writes, prev_flush=None, verify:bytes=None):
self.prev_flush = prev_flush
self.data = writes
# find extents
start = min((write.start for write in self.data))
end = max((write.end for write in self.data))
if prev_flush is None:
self.start = start
self.end = end
self.index = []
return
self.start = min(start, prev_flush.start)
self.end = max(end, prev_flush.end)
self.index = [Simple.Chunk(prev_flush.start, prev_flush)]
# find leaf count and leaf depths
# => one path that could show optimizations to avoid the O(n) choices -> start by iterating prev_flush's leaves rather than these
leaves = [*self.leaves()]
max_depth = len(leaves).bit_length()
self.index = []
depth = 0
shared_parents = None
for leaf_path, leaf_offset, leaf_data in leaves:
if verify is not None:
assert verify[leaf_offset:leaf_offset+len(leaf_data)] == leaf_data
leaf_end = leaf_offset + len(leaf_data)
if not leaf_path:
#print('skipping', leaf_offset, leaf_end)
# references data that doesn't need indexing here, skip by it
if len(self.index):
self.index[-1].end = leaf_end
continue
if len(self.index) and len(leaf_path) > depth and leaf_path[depth].data is self.index[-1].data:
# the data happens to be in the preceding index
min_depth = len(leaf_path) - max_depth
new_depth = max(depth, min_depth)
depth_change = new_depth - depth
if len(shared_parents) > depth_change and leaf_path[new_depth].data is shared_parents[depth_change].data:
#print('extending', self.index[-1].start, leaf_end)
# parents are shared enough to not reach the max depth if the preceding index is merged
if depth_change > 0:
old_index_entry = self.index[-1]
new_index_entry = Simple.Chunk(self.index[-1].start, shared_parents[depth_change].data, leaf_end)
self.index[-1] = new_index_entry
if verify is not None:
for verify_path, verify_offset, verify_data in self.leaves(new_index_entry.start, new_index_entry.end):
assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data
else:
self.index[-1].end = leaf_end
if verify is not None:
for verify_path, verify_offset, verify_data in self.leaves(self.index[-1].start, self.index[-1].end):
assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data
shared_parents = [parent for index, parent in zip(leaf_path[new_depth:], shared_parents[depth_change:]) if index.data is parent.data]
depth = new_depth
assert len(shared_parents)
continue
# otherwise, make a new index entry
if len(self.index):
if verify is not None:
for verify_path, verify_offset, verify_data in self.leaves(self.index[-1].start, self.index[-1].end):
assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data
# first make the last index more shallow based on shared_parents
if shared_parents[-1].data is not self.index[-1].data:
#print('shallowing', self.index[-1].start, self.index[-1].end)
old_index_entry = self.index[-1]
new_index_entry = Simple.Chunk(self.index[-1].start, shared_parents[-1].data, self.index[-1].end)
self.index[-1] = new_index_entry
if verify is not None:
for verify_path, verify_offset, verify_data in self.leaves(self.index[-1].start, self.index[-1].end):
assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data
#print('index', leaf_offset, leaf_end)
# initialise new index
depth = max(0, len(leaf_path) - max_depth)
shared_parents = leaf_path[depth:]
shallow_index = leaf_path[depth]
new_index_entry = Simple.Chunk(leaf_offset, shallow_index.data, leaf_end) # there may be some duplicate information storage regarding starts and ends, unsure. note iterating leaves gives separate starts and ends.
if verify is not None:
for verify_path, verify_offset, verify_data in new_index_entry.data.leaves(new_index_entry.start, new_index_entry.end):
assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data
self.index.append(new_index_entry)
if verify is not None:
for verify_path, verify_offset, verify_data in self.index[-1].data.leaves(self.index[-1].start, self.index[-1].end):
assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data
continue
#print('indexes', self.index)
def __len__(self):
return self.end - self.start
#def lookup(self, offset
def leaves(self, start = None, end = None):
if start is None:
start = self.start
if end is None:
end = self.end
#print('leaves', self, start, end)
offset = start
data_iter = iter(enumerate(self.data))
index_iter = iter(enumerate(self.index))
next_write_idx, next_write = next(data_iter, (-1, None))
next_index_idx, next_index = next(index_iter, (-1, None))
while offset < end:
if next_write is not None and offset >= next_write.start:
if offset < next_write.end:
# offset >= next_write
# so we look in the write
substart = offset - next_write.start
subend = min(end, next_write.end) - next_write.start
assert subend >= substart
#print('it write yielding', self, next_write_idx, offset, subend + next_write.start)
yield ([], offset, next_write.data[substart:subend])
offset += subend - substart
assert offset <= end
next_write_idx, next_write = next(data_iter, (-1, None))
else:
# offset < next_write
# so we look in the index
assert next_index is not None
subend = min(next_write.start, end) if next_write is not None else end
while offset >= next_index.end:
next_index_idx, next_index = next(index_iter, (-1, None))
assert offset >= next_index.start and offset < next_index.end
subend = min(subend, next_index.end)
assert subend <= end
#yield from next_index.data.leaves(offset, subend, depth + 1)
for path, offset, data in next_index.data.leaves(offset, subend):
# print('it index pathing', self, next_index_idx, len(path)+1, offset, offset+len(data))
yield [next_index, *path], offset, data
offset = subend
assert offset <= end
if start == self.start and end == self.end:
extra_index_idx, extra_index = next(index_iter, (-1, None))
assert extra_index is None
def __init__(self, latest = None):
self.tip = latest
self.pending = []
def write(self, offset, data):
#print('write', offset, offset+len(data))
for idx, pending in [*enumerate(self.pending)]:
if pending.start <= offset + len(data) and pending.end >= offset:
# merge pending with data
# then merge pending with neighbors
if offset > pending.start:
merged_data = pending.data[:offset - pending.start] + data
else:
merged_data = data
if offset + len(data) < pending.end:
merged_data = merged_data + pending.data[offset + len(data) - pending.start:]
# we could maybe merge 'merged' via recursion
# basically we'd excise pending, and then write merged
self.pending.pop(idx)
return self.write(min(offset, pending.start), merged_data)
elif pending.start > offset + len(data):
# passed this data without overlap
return self.pending.insert(idx, self.Chunk(offset, data))
self.pending.append(self.Chunk(offset, data))
def flush(self, verify:bytes=None):
self.tip = self.Flush(*self.pending, prev_flush=self.tip, verify=verify)
self.pending = []
def leaves(self, start = None, end = None):
if self.tip is not None:
return self.tip.leaves(start, end)
if __name__ == '__main__':
import random
SIZE=4096
#random.seed(1)
# found min flush 3 thru 70
store = Simple()
comparison = bytearray(SIZE)
store.write(0, bytes(SIZE))
for flushes in range(1024):
for writes in range(random.randint(1,32)):
start = random.randint(0, SIZE)
end = random.randint(0, SIZE)
start, end = (start, end) if start <= end else (end, start)
size = end - start
if size:
data = random.randint(0,(1<<(8*size) - 1)).to_bytes(size,'little')
comparison[start:end] = data
store.write(start, data)
for pending in store.pending:
assert comparison[pending.start:pending.end] == pending.data
store.flush(verify=comparison)
last_offset = 0
max_depth = 0
for path, offset, data in store.leaves():
assert offset >= last_offset
assert comparison[last_offset:offset] == bytes(offset - last_offset)
last_offset = offset + len(data)
assert comparison[offset:last_offset] == data
max_depth = max(len(path), max_depth)
assert comparison[last_offset:] == bytes(len(comparison) - last_offset)
print(flushes, len(store.tip.index), 'x', max_depth, 'OK')