This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e9baf3e01799f4e026b1fad6543c2825310ecdca Author: Alex Petrov <[email protected]> AuthorDate: Mon Jul 22 10:11:42 2024 +0200 Add size to the segment index for safer journal reads Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-19871 --- .../apache/cassandra/journal/ActiveSegment.java | 22 ++-- .../apache/cassandra/journal/InMemoryIndex.java | 58 +++++----- src/java/org/apache/cassandra/journal/Index.java | 50 ++++++++- src/java/org/apache/cassandra/journal/Journal.java | 12 ++- .../org/apache/cassandra/journal/OnDiskIndex.java | 115 +++++++++++++++----- src/java/org/apache/cassandra/journal/Params.java | 5 + src/java/org/apache/cassandra/journal/Segment.java | 20 ++-- .../apache/cassandra/journal/SegmentWriter.java | 5 +- .../apache/cassandra/journal/StaticSegment.java | 4 +- .../org/apache/cassandra/journal/IndexTest.java | 120 ++++++++++++++------- 10 files changed, 291 insertions(+), 120 deletions(-) diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java b/src/java/org/apache/cassandra/journal/ActiveSegment.java index ebbd672b80..a815d23199 100644 --- a/src/java/org/apache/cassandra/journal/ActiveSegment.java +++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java @@ -125,9 +125,9 @@ final class ActiveSegment<K, V> extends Segment<K, V> * Expects the caller to acquire the ref to the segment and the record to exist. */ @Override - boolean read(int offset, EntrySerializer.EntryHolder<K> into) + boolean read(int offset, int size, EntrySerializer.EntryHolder<K> into) { - ByteBuffer duplicate = buffer.duplicate().position(offset).limit(buffer.capacity()); + ByteBuffer duplicate = buffer.duplicate().position(offset).limit(offset + size); try { EntrySerializer.read(into, keySupport, duplicate, descriptor.userVersion); @@ -394,7 +394,7 @@ final class ActiveSegment<K, V> extends Segment<K, V> opGroup.close(); return null; } - return new Allocation(opGroup, buffer.duplicate().position(position).limit(position + totalSize)); + return new Allocation(opGroup, buffer.duplicate().position(position).limit(position + totalSize), totalSize); } catch (Throwable t) { @@ -431,13 +431,15 @@ final class ActiveSegment<K, V> extends Segment<K, V> { private final OpOrder.Group appendOp; private final ByteBuffer buffer; - private final int position; + private final int start; + private final int length; - Allocation(OpOrder.Group appendOp, ByteBuffer buffer) + Allocation(OpOrder.Group appendOp, ByteBuffer buffer, int length) { this.appendOp = appendOp; this.buffer = buffer; - this.position = buffer.position(); + this.start = buffer.position(); + this.length = length; } RecordPointer write(K id, ByteBuffer record, Set<Integer> hosts) @@ -445,9 +447,9 @@ final class ActiveSegment<K, V> extends Segment<K, V> try (BufferedDataOutputStreamPlus out = new DataOutputBufferFixed(buffer)) { EntrySerializer.write(id, record, hosts, keySupport, out, descriptor.userVersion); - index.update(id, position); + index.update(id, start, length); metadata.update(hosts); - return new RecordPointer(descriptor.timestamp, position); + return new RecordPointer(descriptor.timestamp, start); } catch (IOException e) { @@ -465,7 +467,7 @@ final class ActiveSegment<K, V> extends Segment<K, V> try (BufferedDataOutputStreamPlus out = new DataOutputBufferFixed(buffer)) { EntrySerializer.write(id, record, hosts, keySupport, out, descriptor.userVersion); - index.update(id, position); + index.update(id, start, length); metadata.update(hosts); } catch (IOException e) @@ -482,7 +484,7 @@ final class ActiveSegment<K, V> extends Segment<K, V> { try (Timer.Context ignored = waitingOnFlush.time()) { - waitForFlush(position); + waitForFlush(start); } } } diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java b/src/java/org/apache/cassandra/journal/InMemoryIndex.java index 1ff4a28d7a..5417bfea40 100644 --- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java +++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java @@ -34,9 +34,9 @@ import org.apache.cassandra.io.util.FileOutputStreamPlus; */ final class InMemoryIndex<K> extends Index<K> { - private static final int[] EMPTY = new int[0]; + private static final long[] EMPTY = new long[0]; - private final NavigableMap<K, int[]> index; + private final NavigableMap<K, long[]> index; // CSLM#lastKey() can be costly, so track lastId separately; // TODO: this could easily be premature and misguided; @@ -48,29 +48,31 @@ final class InMemoryIndex<K> extends Index<K> return new InMemoryIndex<>(keySupport, new ConcurrentSkipListMap<>(keySupport)); } - private InMemoryIndex(KeySupport<K> keySupport, NavigableMap<K, int[]> index) + private InMemoryIndex(KeySupport<K> keySupport, NavigableMap<K, long[]> index) { super(keySupport); this.index = index; this.lastId = new AtomicReference<>(); } - public void update(K id, int offset) + public void update(K id, int offset, int size) { - index.merge(id, new int[] { offset }, (current, value) -> - { - int idx = Arrays.binarySearch(current, offset); - if (idx >= 0) // repeat update() call; shouldn't occur, but we might as well allow this NOOP - return current; - - /* Merge the new offset with existing values */ - int pos = -idx - 1; - int[] merged = new int[current.length + 1]; - System.arraycopy(current, 0, merged, 0, pos); - merged[pos] = offset; - System.arraycopy(current, pos, merged, pos + 1, current.length - pos); - return merged; - }); + long currentOffsetAndSize = composeOffsetAndSize(offset, size); + index.merge(id, new long[] { currentOffsetAndSize }, + (current, value) -> + { + int idx = Arrays.binarySearch(current, currentOffsetAndSize); + if (idx >= 0) // repeat update() call; shouldn't occur, but we might as well allow this NOOP + return current; + + /* Merge the new offset with existing values */ + int pos = -idx - 1; + long[] merged = new long[current.length + 1]; + System.arraycopy(current, 0, merged, 0, pos); + merged[pos] = currentOffsetAndSize; + System.arraycopy(current, pos, merged, pos + 1, current.length - pos); + return merged; + }); lastId.accumulateAndGet(id, (current, update) -> (null == current || keySupport.compare(current, update) < 0) ? update : current); } @@ -90,20 +92,20 @@ final class InMemoryIndex<K> extends Index<K> } @Override - public int[] lookUp(K id) + public long[] lookUp(K id) { return mayContainId(id) ? index.getOrDefault(id, EMPTY) : EMPTY; } @Override - public int lookUpFirst(K id) + public long lookUpFirst(K id) { - int[] offests = lookUp(id); - return offests.length == 0 ? -1 : offests[0]; + long[] offsets = lookUp(id); + return offsets.length == 0 ? -1 : offsets[0]; } @Override - int[] lookUpAll(K id) + long[] lookUpAll(K id) { return lookUp(id); } @@ -128,10 +130,18 @@ final class InMemoryIndex<K> extends Index<K> static <K> InMemoryIndex<K> rebuild(Descriptor descriptor, KeySupport<K> keySupport, int fsyncedLimit) { InMemoryIndex<K> index = new InMemoryIndex<>(keySupport, new TreeMap<>(keySupport)); + try (StaticSegment.SequentialReader<K> reader = StaticSegment.reader(descriptor, keySupport, fsyncedLimit)) { + int last = -1; while (reader.advance()) - index.update(reader.id(), reader.offset()); + { + int current = reader.offset(); + if (last >= 0) + index.update(reader.id(), last, current); + last = current; + } + } return index; } diff --git a/src/java/org/apache/cassandra/journal/Index.java b/src/java/org/apache/cassandra/journal/Index.java index cd2b69f2e4..f42a42d5ed 100644 --- a/src/java/org/apache/cassandra/journal/Index.java +++ b/src/java/org/apache/cassandra/journal/Index.java @@ -41,7 +41,7 @@ abstract class Index<K> implements Closeable * * @return the found offsets into the segment, if any; can be empty */ - abstract int[] lookUp(K id); + abstract long[] lookUp(K id); /** * Look up offsets by id. It's possible, due to retries, for a segment @@ -50,8 +50,9 @@ abstract class Index<K> implements Closeable * * @return the first offset into the segment, or -1 is none were found */ - abstract int lookUpFirst(K id); - abstract int[] lookUpAll(K id); + abstract long lookUpFirst(K id); + + abstract long[] lookUpAll(K id); /** * @return the first (smallest) id in the index @@ -83,4 +84,47 @@ abstract class Index<K> implements Closeable { return any(ids, this::mayContainId); } + + interface IndexIterator<K> + { + boolean hasNext(); + K currentKey(); + int currentOffset(); + int currentSize(); + void next(); + } + + /** + * Helper methods + */ + + public static int readOffset(long record) + { + return (int) (0xffffffffL & (record >> 32)); + } + + public static long writeOffset(long record, int offset) + { + record &= 0x00000000ffffffffL; //unset all higher bits + record |= ((long) offset) << 32; + return record; + } + + public static int readSize(long record) + { + return (int) (0xffffffffL & record); + } + + public static long writeSize(long record, int size) + { + record &= 0xffffffff00000000L; // unset all lower bits + record |= (long) size; + return record; + } + + public static long composeOffsetAndSize(int offset, int size) + { + return writeSize(writeOffset(0, offset), size); + } + } diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index 37fd1fa92b..d633956e51 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -274,7 +274,7 @@ public class Journal<K, V> implements Shutdownable * * @return deserialized record if present, null otherwise */ - public V read(long segmentTimestamp, int offset) + public V read(long segmentTimestamp, int offset, int size) { try (ReferencedSegment<K, V> referenced = selectAndReference(segmentTimestamp)) { @@ -283,7 +283,7 @@ public class Journal<K, V> implements Shutdownable return null; EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); - segment.read(offset, holder); + segment.read(offset, size, holder); try (DataInputBuffer in = new DataInputBuffer(holder.value, false)) { @@ -383,11 +383,13 @@ public class Journal<K, V> implements Shutdownable { for (Segment<K, V> segment : segments.all()) { - int[] offsets = segment.index().lookUp(id); - for (int offset : offsets) + long[] offsets = segment.index().lookUp(id); + for (long offsetAndSize : offsets) { + int offset = Index.readOffset(offsetAndSize); + int size = Index.readSize(offsetAndSize); holder.clear(); - if (segment.read(offset, holder)) + if (segment.read(offset, size, holder)) { try (DataInputBuffer in = new DataInputBuffer(holder.value, false)) { diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java b/src/java/org/apache/cassandra/journal/OnDiskIndex.java index 4cbb3d4e57..ba769d6163 100644 --- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java +++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java @@ -28,6 +28,7 @@ import java.util.zip.CRC32; import javax.annotation.Nullable; +import accord.utils.Invariants; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.File; @@ -36,6 +37,7 @@ import org.apache.cassandra.utils.Crc; import static org.apache.cassandra.journal.Journal.validateCRC; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; +import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong; /** * An on-disk (memory-mapped) index for a completed flushed segment. @@ -44,10 +46,10 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; */ final class OnDiskIndex<K> extends Index<K> { - private static final int[] EMPTY = new int[0]; + private static final long[] EMPTY = new long[0]; private static final int FILE_PREFIX_SIZE = 4 + 4; // count of entries, CRC - private static final int VALUE_SIZE = 4; // int offset + private static final int VALUE_SIZE = Long.BYTES; // int offset + int size private final int KEY_SIZE; private final int ENTRY_SIZE; @@ -146,7 +148,7 @@ final class OnDiskIndex<K> extends Index<K> } static <K> void write( - NavigableMap<K, int[]> entries, KeySupport<K> keySupport, DataOutputPlus out, int userVersion) throws IOException + NavigableMap<K, long[]> entries, KeySupport<K> keySupport, DataOutputPlus out, int userVersion) throws IOException { CRC32 crc = Crc.crc32(); @@ -158,16 +160,25 @@ final class OnDiskIndex<K> extends Index<K> updateChecksumInt(crc, size); out.writeInt((int) crc.getValue()); - for (Map.Entry<K, int[]> entry : entries.entrySet()) + for (Map.Entry<K, long[]> entry : entries.entrySet()) { - for (int offset : entry.getValue()) + long prev = -1; + for (long offsetAndSize : entry.getValue()) { K key = entry.getKey(); keySupport.serialize(key, out, userVersion); keySupport.updateChecksum(crc, key, userVersion); - out.writeInt(offset); - updateChecksumInt(crc, offset); + if (prev != -1) + { + long tmp = prev; + Invariants.checkState(readOffset(offsetAndSize) > readOffset(prev), + () -> String.format("Offsets should be strictly monotonic, but found %d following %d", + readOffset(offsetAndSize), readOffset(tmp))); + } + out.writeLong(offsetAndSize); + updateChecksumLong(crc, offsetAndSize); + prev = offsetAndSize; } } @@ -189,7 +200,7 @@ final class OnDiskIndex<K> extends Index<K> } @Override - public int[] lookUp(K id) + public long[] lookUp(K id) { if (!mayContainId(id)) return EMPTY; @@ -198,7 +209,7 @@ final class OnDiskIndex<K> extends Index<K> if (keyIndex < 0) return EMPTY; - int[] offsets = new int[] { offsetAtIndex(keyIndex) }; + long[] records = new long[] { recordAtIndex(keyIndex) }; /* * Duplicate entries are possible within one segment (but should be rare). @@ -207,27 +218,27 @@ final class OnDiskIndex<K> extends Index<K> for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--) { - int length = offsets.length; - offsets = Arrays.copyOf(offsets, length + 1); - offsets[length] = offsetAtIndex(i); + int length = records.length; + records = Arrays.copyOf(records, length + 1); + records[length] = recordAtIndex(i); } for (int i = keyIndex + 1; i < entryCount && id.equals(keyAtIndex(i)); i++) { - int length = offsets.length; - offsets = Arrays.copyOf(offsets, length + 1); - offsets[length] = offsetAtIndex(i); + int length = records.length; + records = Arrays.copyOf(records, length + 1); + records[length] = recordAtIndex(i); } - Arrays.sort(offsets); - return offsets; + Arrays.sort(records); + return records; } @Override - public int lookUpFirst(K id) + public long lookUpFirst(K id) { if (!mayContainId(id)) - return -1; + return -1L; int keyIndex = binarySearch(id); @@ -238,14 +249,14 @@ final class OnDiskIndex<K> extends Index<K> for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--) keyIndex = i; - return keyIndex < 0 ? -1 : offsetAtIndex(keyIndex); + return keyIndex < 0 ? -1 : recordAtIndex(keyIndex); } @Override - public int[] lookUpAll(K id) + public long[] lookUpAll(K id) { if (!mayContainId(id)) - return new int[0]; + return new long[0]; int start = binarySearch(id); int firstKeyIndex = start; @@ -254,31 +265,81 @@ final class OnDiskIndex<K> extends Index<K> firstKeyIndex = i; if (firstKeyIndex < 0) - return new int[0]; + return new long[0]; int lastKeyIndex = start; for (int i = lastKeyIndex + 1; i < entryCount && id.equals(keyAtIndex(i)); i++) lastKeyIndex = i; - int[] all = new int[lastKeyIndex - firstKeyIndex + 1]; + long[] all = new long[lastKeyIndex - firstKeyIndex + 1]; int idx = firstKeyIndex; for (int i = 0; i < all.length; i++) { - all[i] = offsetAtIndex(idx); + all[i] = recordAtIndex(idx); idx++; } return all; } + public IndexIterator<K> iterator() + { + return new IndexIteratorImpl(); + } + + private class IndexIteratorImpl implements IndexIterator<K> + { + int currentIdx; + K currentKey; + int currentOffset; + int currentSize; + + IndexIteratorImpl() + { + currentIdx = -1; + } + + @Override + public boolean hasNext() + { + return currentIdx < (entryCount - 1); + } + + @Override + public K currentKey() + { + return currentKey; + } + + @Override + public int currentOffset() + { + return currentOffset; + } + + @Override + public int currentSize() + { + return currentSize; + } + + public void next() + { + currentIdx++; + currentKey = keyAtIndex(currentIdx); + long record = recordAtIndex(currentIdx); + currentOffset = Index.readOffset(record); + currentSize = Index.readSize(record); + } + } private K keyAtIndex(int index) { return keySupport.deserialize(buffer, FILE_PREFIX_SIZE + index * ENTRY_SIZE, descriptor.userVersion); } - private int offsetAtIndex(int index) + private long recordAtIndex(int index) { - return buffer.getInt(FILE_PREFIX_SIZE + index * ENTRY_SIZE + KEY_SIZE); + return buffer.getLong(FILE_PREFIX_SIZE + index * ENTRY_SIZE + KEY_SIZE); } /* diff --git a/src/java/org/apache/cassandra/journal/Params.java b/src/java/org/apache/cassandra/journal/Params.java index 46b382ea27..17e719ce5d 100644 --- a/src/java/org/apache/cassandra/journal/Params.java +++ b/src/java/org/apache/cassandra/journal/Params.java @@ -43,6 +43,11 @@ public interface Params */ int flushPeriodMillis(); + default int flushPeriodNanos() + { + return flushPeriodMillis() * 1_000_000; + } + /** * @return milliseconds to block writes for while waiting for a slow disk flush to complete * when in {@link FlushMode#PERIODIC} mode diff --git a/src/java/org/apache/cassandra/journal/Segment.java b/src/java/org/apache/cassandra/journal/Segment.java index 6700cb1445..e548c52128 100644 --- a/src/java/org/apache/cassandra/journal/Segment.java +++ b/src/java/org/apache/cassandra/journal/Segment.java @@ -58,12 +58,14 @@ abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, V>> boolean readFirst(K id, RecordConsumer<K> consumer) { - int offset = index().lookUpFirst(id); - if (offset == -1) + long offsetAndSize = index().lookUpFirst(id); + if (offsetAndSize == -1) return false; EntrySerializer.EntryHolder<K> into = new EntrySerializer.EntryHolder<>(); - if (read(offset, into)) + int offset = Index.readOffset(offsetAndSize); + int size = Index.readSize(offset); + if (read(offset, size, into)) { Invariants.checkState(id.equals(into.key), "Index for %s read incorrect key: expected %s but read %s", descriptor, id, into.key); consumer.accept(descriptor.timestamp, offset, id, into.value, into.hosts, descriptor.userVersion); @@ -74,8 +76,8 @@ abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, V>> boolean readFirst(K id, EntrySerializer.EntryHolder<K> into) { - int offset = index().lookUpFirst(id); - if (offset == -1 || !read(offset, into)) + long offsetAndSize = index().lookUpFirst(id); + if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize), Index.readSize(offsetAndSize), into)) return false; Invariants.checkState(id.equals(into.key), "Index for %s read incorrect key: expected %s but read %s", descriptor, id, into.key); return true; @@ -83,14 +85,16 @@ abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, V>> void readAll(K id, EntrySerializer.EntryHolder<K> into, Runnable onEntry) { - int[] all = index().lookUpAll(id); + long[] all = index().lookUpAll(id); for (int i = 0; i < all.length; i++) { - Invariants.checkState(read(all[i], into), "Read should always return true"); + int offset = Index.readOffset(all[i]); + int size = Index.readSize(all[i]); + Invariants.checkState(read(offset, size, into), "Read should always return true"); onEntry.run(); } } - abstract boolean read(int offset, EntrySerializer.EntryHolder<K> into); + abstract boolean read(int offset, int size, EntrySerializer.EntryHolder<K> into); } diff --git a/src/java/org/apache/cassandra/journal/SegmentWriter.java b/src/java/org/apache/cassandra/journal/SegmentWriter.java index b8436aed66..09797c363e 100644 --- a/src/java/org/apache/cassandra/journal/SegmentWriter.java +++ b/src/java/org/apache/cassandra/journal/SegmentWriter.java @@ -70,10 +70,9 @@ final class SegmentWriter<K> implements Closeable int position = position(); try { - index.update(key, position); - metadata.update(hosts); - EntrySerializer.write(key, record, hosts, keySupport, trackedOut, descriptor.userVersion); + index.update(key, position, position() - position); + metadata.update(hosts); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java b/src/java/org/apache/cassandra/journal/StaticSegment.java index a25fab0483..f63701771c 100644 --- a/src/java/org/apache/cassandra/journal/StaticSegment.java +++ b/src/java/org/apache/cassandra/journal/StaticSegment.java @@ -205,9 +205,9 @@ final class StaticSegment<K, V> extends Segment<K, V> * Expects the record to have been written at this offset, but potentially not flushed and lost. */ @Override - boolean read(int offset, EntrySerializer.EntryHolder<K> into) + boolean read(int offset, int size, EntrySerializer.EntryHolder<K> into) { - ByteBuffer duplicate = buffer.duplicate().position(offset); + ByteBuffer duplicate = buffer.duplicate().position(offset).limit(offset + size); try (DataInputBuffer in = new DataInputBuffer(duplicate, false)) { return EntrySerializer.tryRead(into, keySupport, duplicate, in, syncedOffsets.syncedOffset(), descriptor.userVersion); diff --git a/test/unit/org/apache/cassandra/journal/IndexTest.java b/test/unit/org/apache/cassandra/journal/IndexTest.java index 8301ea988e..5e7314a338 100644 --- a/test/unit/org/apache/cassandra/journal/IndexTest.java +++ b/test/unit/org/apache/cassandra/journal/IndexTest.java @@ -19,21 +19,28 @@ package org.apache.cassandra.journal; import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.TreeMap; import java.util.stream.Collectors; import com.google.common.collect.Maps; +import org.junit.Assert; import org.junit.Test; import org.agrona.collections.IntHashSet; import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.Generators; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.TimeUUID; import org.quicktheories.core.Gen; import org.quicktheories.impl.Constraint; +import static org.apache.cassandra.journal.Index.composeOffsetAndSize; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; @@ -44,7 +51,7 @@ import static org.quicktheories.QuickTheory.qt; public class IndexTest { - private static final int[] EMPTY = new int[0]; + private static final long[] EMPTY = new long[0]; @Test public void testInMemoryIndexBasics() @@ -70,17 +77,18 @@ public class IndexTest int val32 = 3200; int val33 = 3300; - index.update(key1, val11); - index.update(key2, val21); - index.update(key2, val22); - index.update(key3, val31); - index.update(key3, val32); - index.update(key3, val33); + index.update(key1, val11, 1); + index.update(key2, val21, 2); + index.update(key2, val22, 3); + index.update(key3, val31, 4); + index.update(key3, val32, 5); + index.update(key3, val33, 6); assertArrayEquals(EMPTY, index.lookUp(key0)); - assertArrayEquals(new int[] { val11 }, index.lookUp(key1)); - assertArrayEquals(new int[] { val21, val22 }, index.lookUp(key2)); - assertArrayEquals(new int[] { val31, val32, val33 }, index.lookUp(key3)); + + assertArrayEquals(new long[] { composeOffsetAndSize(val11, 1) }, index.lookUp(key1)); + assertArrayEquals(new long[] { composeOffsetAndSize(val21, 2), composeOffsetAndSize(val22, 3) }, index.lookUp(key2)); + assertArrayEquals(new long[] { composeOffsetAndSize(val31, 4), composeOffsetAndSize(val32, 5), composeOffsetAndSize(val33, 6) }, index.lookUp(key3)); assertArrayEquals(EMPTY, index.lookUp(key4)); assertEquals(key1, index.firstId()); @@ -111,12 +119,12 @@ public class IndexTest int val32 = 3200; int val33 = 3300; - inMemory.update(key1, val11); - inMemory.update(key2, val21); - inMemory.update(key2, val22); - inMemory.update(key3, val31); - inMemory.update(key3, val32); - inMemory.update(key3, val33); + inMemory.update(key1, val11, 1); + inMemory.update(key2, val21, 2); + inMemory.update(key2, val22, 3); + inMemory.update(key3, val31, 4); + inMemory.update(key3, val32, 5); + inMemory.update(key3, val33, 6); File directory = new File(Files.createTempDirectory(null)); directory.deleteOnExit(); @@ -126,9 +134,9 @@ public class IndexTest try (OnDiskIndex<TimeUUID> onDisk = OnDiskIndex.open(descriptor, TimeUUIDKeySupport.INSTANCE)) { assertArrayEquals(EMPTY, onDisk.lookUp(key0)); - assertArrayEquals(new int[] { val11 }, onDisk.lookUp(key1)); - assertArrayEquals(new int[] { val21, val22 }, onDisk.lookUp(key2)); - assertArrayEquals(new int[] { val31, val32, val33 }, onDisk.lookUp(key3)); + assertArrayEquals(new long[] { composeOffsetAndSize(val11, 1) }, onDisk.lookUp(key1)); + assertArrayEquals(new long[] { composeOffsetAndSize(val21, 2), composeOffsetAndSize(val22, 3) }, onDisk.lookUp(key2)); + assertArrayEquals(new long[] { composeOffsetAndSize(val31, 4), composeOffsetAndSize(val32, 5), composeOffsetAndSize(val33, 6) }, onDisk.lookUp(key3)); assertArrayEquals(EMPTY, onDisk.lookUp(key4)); assertEquals(key1, onDisk.firstId()); @@ -149,34 +157,34 @@ public class IndexTest Constraint valueSizeConstraint = Constraint.between(0, 10); Constraint positionConstraint = Constraint.between(0, Integer.MAX_VALUE); Gen<TimeUUID> keyGen = Generators.timeUUID(); - Gen<int[]> valueGen = rs -> { - int[] array = new int[(int) rs.next(valueSizeConstraint)]; + Gen<long[]> valueGen = rs -> { + long[] array = new long[(int) rs.next(valueSizeConstraint)]; IntHashSet uniq = new IntHashSet(); for (int i = 0; i < array.length; i++) { - int value = (int) rs.next(positionConstraint); - while (!uniq.add(value)) - value = (int) rs.next(positionConstraint); - array[i] = value; + int offset = (int) rs.next(positionConstraint); + while (!uniq.add(offset)) + offset = (int) rs.next(positionConstraint); + array[i] = Index.composeOffsetAndSize(offset, (int) rs.next(positionConstraint)); } return array; }; - Gen<Map<TimeUUID, int[]>> gen = rs -> { + Gen<Map<TimeUUID, long[]>> gen = rs -> { int size = (int) rs.next(sizeConstraint); - Map<TimeUUID, int[]> map = Maps.newHashMapWithExpectedSize(size); + Map<TimeUUID, long[]> map = Maps.newHashMapWithExpectedSize(size); for (int i = 0; i < size; i++) { TimeUUID key = keyGen.generate(rs); while (map.containsKey(key)) key = keyGen.generate(rs); - int[] value = valueGen.generate(rs); + long[] value = valueGen.generate(rs); map.put(key, value); } return map; }; gen = gen.describedAs(map -> { StringBuilder sb = new StringBuilder(); - for (Map.Entry<TimeUUID, int[]> entry : map.entrySet()) + for (Map.Entry<TimeUUID, long[]> entry : map.entrySet()) sb.append('\n').append(entry.getKey()).append('\t').append(Arrays.toString(entry.getValue())); return sb.toString(); }); @@ -185,19 +193,19 @@ public class IndexTest qt().forAll(gen).checkAssert(map -> test(directory, map)); } - private static void test(File directory, Map<TimeUUID, int[]> map) + private static void test(File directory, Map<TimeUUID, long[]> map) { InMemoryIndex<TimeUUID> inMemory = InMemoryIndex.create(TimeUUIDKeySupport.INSTANCE); - for (Map.Entry<TimeUUID, int[]> e : map.entrySet()) + for (Map.Entry<TimeUUID, long[]> e : map.entrySet()) { TimeUUID key = e.getKey(); assertThat(inMemory.lookUp(key)).isEmpty(); - int[] value = e.getValue(); + long[] value = e.getValue(); if (value.length == 0) continue; - for (int i : value) - inMemory.update(key, i); + for (long i : value) + inMemory.update(key, Index.readOffset(i), Index.readSize(i)); Arrays.sort(value); } assertIndex(map, inMemory); @@ -208,10 +216,29 @@ public class IndexTest try (OnDiskIndex<TimeUUID> onDisk = OnDiskIndex.open(descriptor, TimeUUIDKeySupport.INSTANCE)) { assertIndex(map, onDisk); + + List<Pair<TimeUUID, Long>> sortedEntries = new ArrayList<>(); + for (Map.Entry<TimeUUID, long[]> entry : new TreeMap<>(map).entrySet()) + { + for (long l : entry.getValue()) + sortedEntries.add(Pair.create(entry.getKey(), l)); + } + + Index.IndexIterator<TimeUUID> iter = onDisk.iterator(); + Iterator<Pair<TimeUUID, Long>> expectedIter = sortedEntries.iterator(); + while (iter.hasNext()) + { + iter.next(); + Pair<TimeUUID, Long> expected = expectedIter.next(); + Assert.assertEquals(iter.currentKey(), expected.left); + Assert.assertEquals(iter.currentSize(), Index.readSize(expected.right)); + Assert.assertEquals(iter.currentOffset(), Index.readOffset(expected.right)); + } } } - private static void assertIndex(Map<TimeUUID, int[]> expected, Index<TimeUUID> actual) + + private static void assertIndex(Map<TimeUUID, long[]> expected, Index<TimeUUID> actual) { List<TimeUUID> keys = expected.entrySet() .stream() @@ -231,11 +258,11 @@ public class IndexTest assertThat(actual.lastId()).describedAs("Index %s had wrong lastId", actual).isEqualTo(keys.get(keys.size() - 1)); } - for (Map.Entry<TimeUUID, int[]> e : expected.entrySet()) + for (Map.Entry<TimeUUID, long[]> e : expected.entrySet()) { TimeUUID key = e.getKey(); - int[] value = e.getValue(); - int[] read = actual.lookUp(key); + long[] value = e.getValue(); + long[] read = actual.lookUp(key); if (value.length == 0) { @@ -248,4 +275,21 @@ public class IndexTest } } } + + @Test + public void testHelperMethods() + { + Random r = new Random(); + for (int i = 0; i < 1000000; i++) + { + long record = 0; + int size = Math.abs(r.nextInt()); + record = Index.writeSize(record, size); + int offset = Math.abs(r.nextInt()); + record = Index.writeOffset(record, offset); + assertEquals(size, Index.readSize(record)); + assertEquals(offset, Index.readOffset(record)); + assertEquals(record, composeOffsetAndSize(offset, size)); + } + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
