This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e9baf3e01799f4e026b1fad6543c2825310ecdca
Author: Alex Petrov <[email protected]>
AuthorDate: Mon Jul 22 10:11:42 2024 +0200

    Add size to the segment index for safer journal reads
    
    Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-19871
---
 .../apache/cassandra/journal/ActiveSegment.java    |  22 ++--
 .../apache/cassandra/journal/InMemoryIndex.java    |  58 +++++-----
 src/java/org/apache/cassandra/journal/Index.java   |  50 ++++++++-
 src/java/org/apache/cassandra/journal/Journal.java |  12 ++-
 .../org/apache/cassandra/journal/OnDiskIndex.java  | 115 +++++++++++++++-----
 src/java/org/apache/cassandra/journal/Params.java  |   5 +
 src/java/org/apache/cassandra/journal/Segment.java |  20 ++--
 .../apache/cassandra/journal/SegmentWriter.java    |   5 +-
 .../apache/cassandra/journal/StaticSegment.java    |   4 +-
 .../org/apache/cassandra/journal/IndexTest.java    | 120 ++++++++++++++-------
 10 files changed, 291 insertions(+), 120 deletions(-)

diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java 
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index ebbd672b80..a815d23199 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -125,9 +125,9 @@ final class ActiveSegment<K, V> extends Segment<K, V>
      * Expects the caller to acquire the ref to the segment and the record to 
exist.
      */
     @Override
-    boolean read(int offset, EntrySerializer.EntryHolder<K> into)
+    boolean read(int offset, int size, EntrySerializer.EntryHolder<K> into)
     {
-        ByteBuffer duplicate = 
buffer.duplicate().position(offset).limit(buffer.capacity());
+        ByteBuffer duplicate = 
buffer.duplicate().position(offset).limit(offset + size);
         try
         {
             EntrySerializer.read(into, keySupport, duplicate, 
descriptor.userVersion);
@@ -394,7 +394,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
                 opGroup.close();
                 return null;
             }
-            return new Allocation(opGroup, 
buffer.duplicate().position(position).limit(position + totalSize));
+            return new Allocation(opGroup, 
buffer.duplicate().position(position).limit(position + totalSize), totalSize);
         }
         catch (Throwable t)
         {
@@ -431,13 +431,15 @@ final class ActiveSegment<K, V> extends Segment<K, V>
     {
         private final OpOrder.Group appendOp;
         private final ByteBuffer buffer;
-        private final int position;
+        private final int start;
+        private final int length;
 
-        Allocation(OpOrder.Group appendOp, ByteBuffer buffer)
+        Allocation(OpOrder.Group appendOp, ByteBuffer buffer, int length)
         {
             this.appendOp = appendOp;
             this.buffer = buffer;
-            this.position = buffer.position();
+            this.start = buffer.position();
+            this.length = length;
         }
 
         RecordPointer write(K id, ByteBuffer record, Set<Integer> hosts)
@@ -445,9 +447,9 @@ final class ActiveSegment<K, V> extends Segment<K, V>
             try (BufferedDataOutputStreamPlus out = new 
DataOutputBufferFixed(buffer))
             {
                 EntrySerializer.write(id, record, hosts, keySupport, out, 
descriptor.userVersion);
-                index.update(id, position);
+                index.update(id, start, length);
                 metadata.update(hosts);
-                return new RecordPointer(descriptor.timestamp, position);
+                return new RecordPointer(descriptor.timestamp, start);
             }
             catch (IOException e)
             {
@@ -465,7 +467,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
             try (BufferedDataOutputStreamPlus out = new 
DataOutputBufferFixed(buffer))
             {
                 EntrySerializer.write(id, record, hosts, keySupport, out, 
descriptor.userVersion);
-                index.update(id, position);
+                index.update(id, start, length);
                 metadata.update(hosts);
             }
             catch (IOException e)
@@ -482,7 +484,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
         {
             try (Timer.Context ignored = waitingOnFlush.time())
             {
-                waitForFlush(position);
+                waitForFlush(start);
             }
         }
     }
diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java 
b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
index 1ff4a28d7a..5417bfea40 100644
--- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java
+++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
@@ -34,9 +34,9 @@ import org.apache.cassandra.io.util.FileOutputStreamPlus;
  */
 final class InMemoryIndex<K> extends Index<K>
 {
-    private static final int[] EMPTY = new int[0];
+    private static final long[] EMPTY = new long[0];
 
-    private final NavigableMap<K, int[]> index;
+    private final NavigableMap<K, long[]> index;
 
     // CSLM#lastKey() can be costly, so track lastId separately;
     // TODO: this could easily be premature and misguided;
@@ -48,29 +48,31 @@ final class InMemoryIndex<K> extends Index<K>
         return new InMemoryIndex<>(keySupport, new 
ConcurrentSkipListMap<>(keySupport));
     }
 
-    private InMemoryIndex(KeySupport<K> keySupport, NavigableMap<K, int[]> 
index)
+    private InMemoryIndex(KeySupport<K> keySupport, NavigableMap<K, long[]> 
index)
     {
         super(keySupport);
         this.index = index;
         this.lastId = new AtomicReference<>();
     }
 
-    public void update(K id, int offset)
+    public void update(K id, int offset, int size)
     {
-        index.merge(id, new int[] { offset }, (current, value) ->
-        {
-            int idx = Arrays.binarySearch(current, offset);
-            if (idx >= 0) // repeat update() call; shouldn't occur, but we 
might as well allow this NOOP
-                return current;
-
-            /* Merge the new offset with existing values */
-            int pos = -idx - 1;
-            int[] merged = new int[current.length + 1];
-            System.arraycopy(current, 0, merged, 0, pos);
-            merged[pos] = offset;
-            System.arraycopy(current, pos, merged, pos + 1, current.length - 
pos);
-            return merged;
-        });
+        long currentOffsetAndSize = composeOffsetAndSize(offset, size);
+        index.merge(id, new long[] { currentOffsetAndSize },
+                    (current, value) ->
+                    {
+                        int idx = Arrays.binarySearch(current, 
currentOffsetAndSize);
+                        if (idx >= 0) // repeat update() call; shouldn't 
occur, but we might as well allow this NOOP
+                            return current;
+
+                        /* Merge the new offset with existing values */
+                        int pos = -idx - 1;
+                        long[] merged = new long[current.length + 1];
+                        System.arraycopy(current, 0, merged, 0, pos);
+                        merged[pos] = currentOffsetAndSize;
+                        System.arraycopy(current, pos, merged, pos + 1, 
current.length - pos);
+                        return merged;
+                    });
 
         lastId.accumulateAndGet(id, (current, update) -> (null == current || 
keySupport.compare(current, update) < 0) ? update : current);
     }
@@ -90,20 +92,20 @@ final class InMemoryIndex<K> extends Index<K>
     }
 
     @Override
-    public int[] lookUp(K id)
+    public long[] lookUp(K id)
     {
         return mayContainId(id) ? index.getOrDefault(id, EMPTY) : EMPTY;
     }
 
     @Override
-    public int lookUpFirst(K id)
+    public long lookUpFirst(K id)
     {
-        int[] offests = lookUp(id);
-        return offests.length == 0 ? -1 : offests[0];
+        long[] offsets = lookUp(id);
+        return offsets.length == 0 ? -1 : offsets[0];
     }
 
     @Override
-    int[] lookUpAll(K id)
+    long[] lookUpAll(K id)
     {
         return lookUp(id);
     }
@@ -128,10 +130,18 @@ final class InMemoryIndex<K> extends Index<K>
     static <K> InMemoryIndex<K> rebuild(Descriptor descriptor, KeySupport<K> 
keySupport, int fsyncedLimit)
     {
         InMemoryIndex<K> index = new InMemoryIndex<>(keySupport, new 
TreeMap<>(keySupport));
+
         try (StaticSegment.SequentialReader<K> reader = 
StaticSegment.reader(descriptor, keySupport, fsyncedLimit))
         {
+            int last = -1;
             while (reader.advance())
-                index.update(reader.id(), reader.offset());
+            {
+                int current = reader.offset();
+                if (last >= 0)
+                    index.update(reader.id(), last, current);
+                last = current;
+            }
+
         }
         return index;
     }
diff --git a/src/java/org/apache/cassandra/journal/Index.java 
b/src/java/org/apache/cassandra/journal/Index.java
index cd2b69f2e4..f42a42d5ed 100644
--- a/src/java/org/apache/cassandra/journal/Index.java
+++ b/src/java/org/apache/cassandra/journal/Index.java
@@ -41,7 +41,7 @@ abstract class Index<K> implements Closeable
      *
      * @return the found offsets into the segment, if any; can be empty
      */
-    abstract int[] lookUp(K id);
+    abstract long[] lookUp(K id);
 
     /**
      * Look up offsets by id. It's possible, due to retries, for a segment
@@ -50,8 +50,9 @@ abstract class Index<K> implements Closeable
      *
      * @return the first offset into the segment, or -1 is none were found
      */
-    abstract int lookUpFirst(K id);
-    abstract int[] lookUpAll(K id);
+    abstract long lookUpFirst(K id);
+
+    abstract long[] lookUpAll(K id);
 
     /**
      * @return the first (smallest) id in the index
@@ -83,4 +84,47 @@ abstract class Index<K> implements Closeable
     {
         return any(ids, this::mayContainId);
     }
+
+    interface IndexIterator<K>
+    {
+        boolean hasNext();
+        K currentKey();
+        int currentOffset();
+        int currentSize();
+        void next();
+    }
+
+    /**
+     * Helper methods
+     */
+
+    public static int readOffset(long record)
+    {
+        return (int) (0xffffffffL & (record >> 32));
+    }
+
+    public static long writeOffset(long record, int offset)
+    {
+        record &= 0x00000000ffffffffL; //unset all higher bits
+        record |= ((long) offset) << 32;
+        return record;
+    }
+
+    public static int readSize(long record)
+    {
+        return (int) (0xffffffffL & record);
+    }
+
+    public static long writeSize(long record, int size)
+    {
+        record &= 0xffffffff00000000L; // unset all lower bits
+        record |= (long) size;
+        return record;
+    }
+
+    public static long composeOffsetAndSize(int offset, int size)
+    {
+        return writeSize(writeOffset(0, offset), size);
+    }
+
 }
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index 37fd1fa92b..d633956e51 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -274,7 +274,7 @@ public class Journal<K, V> implements Shutdownable
      *
      * @return deserialized record if present, null otherwise
      */
-    public V read(long segmentTimestamp, int offset)
+    public V read(long segmentTimestamp, int offset, int size)
     {
         try (ReferencedSegment<K, V> referenced = 
selectAndReference(segmentTimestamp))
         {
@@ -283,7 +283,7 @@ public class Journal<K, V> implements Shutdownable
                 return null;
 
             EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
-            segment.read(offset, holder);
+            segment.read(offset, size, holder);
 
             try (DataInputBuffer in = new DataInputBuffer(holder.value, false))
             {
@@ -383,11 +383,13 @@ public class Journal<K, V> implements Shutdownable
         {
             for (Segment<K, V> segment : segments.all())
             {
-                int[] offsets = segment.index().lookUp(id);
-                for (int offset : offsets)
+                long[] offsets = segment.index().lookUp(id);
+                for (long offsetAndSize : offsets)
                 {
+                    int offset = Index.readOffset(offsetAndSize);
+                    int size = Index.readSize(offsetAndSize);
                     holder.clear();
-                    if (segment.read(offset, holder))
+                    if (segment.read(offset, size, holder))
                     {
                         try (DataInputBuffer in = new 
DataInputBuffer(holder.value, false))
                         {
diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java 
b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
index 4cbb3d4e57..ba769d6163 100644
--- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
@@ -28,6 +28,7 @@ import java.util.zip.CRC32;
 
 import javax.annotation.Nullable;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.File;
@@ -36,6 +37,7 @@ import org.apache.cassandra.utils.Crc;
 
 import static org.apache.cassandra.journal.Journal.validateCRC;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong;
 
 /**
  * An on-disk (memory-mapped) index for a completed flushed segment.
@@ -44,10 +46,10 @@ import static 
org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
  */
 final class OnDiskIndex<K> extends Index<K>
 {
-    private static final int[] EMPTY = new int[0];
+    private static final long[] EMPTY = new long[0];
 
     private static final int FILE_PREFIX_SIZE = 4 + 4; // count of entries, CRC
-    private static final int VALUE_SIZE = 4;           // int offset
+    private static final int VALUE_SIZE = Long.BYTES;   // int offset + int 
size
 
     private final int KEY_SIZE;
     private final int ENTRY_SIZE;
@@ -146,7 +148,7 @@ final class OnDiskIndex<K> extends Index<K>
     }
 
     static <K> void write(
-        NavigableMap<K, int[]> entries, KeySupport<K> keySupport, 
DataOutputPlus out, int userVersion) throws IOException
+        NavigableMap<K, long[]> entries, KeySupport<K> keySupport, 
DataOutputPlus out, int userVersion) throws IOException
     {
         CRC32 crc = Crc.crc32();
 
@@ -158,16 +160,25 @@ final class OnDiskIndex<K> extends Index<K>
         updateChecksumInt(crc, size);
         out.writeInt((int) crc.getValue());
 
-        for (Map.Entry<K, int[]> entry : entries.entrySet())
+        for (Map.Entry<K, long[]> entry : entries.entrySet())
         {
-            for (int offset : entry.getValue())
+            long prev = -1;
+            for (long offsetAndSize : entry.getValue())
             {
                 K key = entry.getKey();
                 keySupport.serialize(key, out, userVersion);
                 keySupport.updateChecksum(crc, key, userVersion);
 
-                out.writeInt(offset);
-                updateChecksumInt(crc, offset);
+                if (prev != -1)
+                {
+                    long tmp = prev;
+                    Invariants.checkState(readOffset(offsetAndSize) > 
readOffset(prev),
+                                          () -> String.format("Offsets should 
be strictly monotonic, but found %d following %d",
+                                                              
readOffset(offsetAndSize), readOffset(tmp)));
+                }
+                out.writeLong(offsetAndSize);
+                updateChecksumLong(crc, offsetAndSize);
+                prev = offsetAndSize;
             }
         }
 
@@ -189,7 +200,7 @@ final class OnDiskIndex<K> extends Index<K>
     }
 
     @Override
-    public int[] lookUp(K id)
+    public long[] lookUp(K id)
     {
         if (!mayContainId(id))
             return EMPTY;
@@ -198,7 +209,7 @@ final class OnDiskIndex<K> extends Index<K>
         if (keyIndex < 0)
             return EMPTY;
 
-        int[] offsets = new int[] { offsetAtIndex(keyIndex) };
+        long[] records = new long[] { recordAtIndex(keyIndex) };
 
         /*
          * Duplicate entries are possible within one segment (but should be 
rare).
@@ -207,27 +218,27 @@ final class OnDiskIndex<K> extends Index<K>
 
         for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--)
         {
-            int length = offsets.length;
-            offsets = Arrays.copyOf(offsets, length + 1);
-            offsets[length] = offsetAtIndex(i);
+            int length = records.length;
+            records = Arrays.copyOf(records, length + 1);
+            records[length] = recordAtIndex(i);
         }
 
         for (int i = keyIndex + 1; i < entryCount && id.equals(keyAtIndex(i)); 
i++)
         {
-            int length = offsets.length;
-            offsets = Arrays.copyOf(offsets, length + 1);
-            offsets[length] = offsetAtIndex(i);
+            int length = records.length;
+            records = Arrays.copyOf(records, length + 1);
+            records[length] = recordAtIndex(i);
         }
 
-        Arrays.sort(offsets);
-        return offsets;
+        Arrays.sort(records);
+        return records;
     }
 
     @Override
-    public int lookUpFirst(K id)
+    public long lookUpFirst(K id)
     {
         if (!mayContainId(id))
-            return -1;
+            return -1L;
 
         int keyIndex = binarySearch(id);
 
@@ -238,14 +249,14 @@ final class OnDiskIndex<K> extends Index<K>
         for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--)
             keyIndex = i;
 
-        return keyIndex < 0 ? -1 : offsetAtIndex(keyIndex);
+        return keyIndex < 0 ? -1 : recordAtIndex(keyIndex);
     }
 
     @Override
-    public int[] lookUpAll(K id)
+    public long[] lookUpAll(K id)
     {
         if (!mayContainId(id))
-            return new int[0];
+            return new long[0];
 
         int start = binarySearch(id);
         int firstKeyIndex = start;
@@ -254,31 +265,81 @@ final class OnDiskIndex<K> extends Index<K>
             firstKeyIndex = i;
 
         if (firstKeyIndex < 0)
-            return new int[0];
+            return new long[0];
 
         int lastKeyIndex = start;
 
         for (int i = lastKeyIndex + 1; i < entryCount && 
id.equals(keyAtIndex(i)); i++)
             lastKeyIndex = i;
 
-        int[] all = new int[lastKeyIndex - firstKeyIndex + 1];
+        long[] all = new long[lastKeyIndex - firstKeyIndex + 1];
         int idx = firstKeyIndex;
         for (int i = 0; i < all.length; i++)
         {
-            all[i] = offsetAtIndex(idx);
+            all[i] = recordAtIndex(idx);
             idx++;
         }
         return all;
     }
 
+    public IndexIterator<K> iterator()
+    {
+        return new IndexIteratorImpl();
+    }
+
+    private class IndexIteratorImpl implements IndexIterator<K>
+    {
+        int currentIdx;
+        K currentKey;
+        int currentOffset;
+        int currentSize;
+
+        IndexIteratorImpl()
+        {
+            currentIdx = -1;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return currentIdx < (entryCount - 1);
+        }
+
+        @Override
+        public K currentKey()
+        {
+            return currentKey;
+        }
+
+        @Override
+        public int currentOffset()
+        {
+            return currentOffset;
+        }
+
+        @Override
+        public int currentSize()
+        {
+            return currentSize;
+        }
+
+        public void next()
+        {
+            currentIdx++;
+            currentKey = keyAtIndex(currentIdx);
+            long record = recordAtIndex(currentIdx);
+            currentOffset = Index.readOffset(record);
+            currentSize = Index.readSize(record);
+        }
+    }
     private K keyAtIndex(int index)
     {
         return keySupport.deserialize(buffer, FILE_PREFIX_SIZE + index * 
ENTRY_SIZE, descriptor.userVersion);
     }
 
-    private int offsetAtIndex(int index)
+    private long recordAtIndex(int index)
     {
-        return buffer.getInt(FILE_PREFIX_SIZE + index * ENTRY_SIZE + KEY_SIZE);
+        return buffer.getLong(FILE_PREFIX_SIZE + index * ENTRY_SIZE + 
KEY_SIZE);
     }
 
     /*
diff --git a/src/java/org/apache/cassandra/journal/Params.java 
b/src/java/org/apache/cassandra/journal/Params.java
index 46b382ea27..17e719ce5d 100644
--- a/src/java/org/apache/cassandra/journal/Params.java
+++ b/src/java/org/apache/cassandra/journal/Params.java
@@ -43,6 +43,11 @@ public interface Params
      */
     int flushPeriodMillis();
 
+    default int flushPeriodNanos()
+    {
+        return flushPeriodMillis() * 1_000_000;
+    }
+
     /**
      * @return milliseconds to block writes for while waiting for a slow disk 
flush to complete
      *         when in {@link FlushMode#PERIODIC} mode
diff --git a/src/java/org/apache/cassandra/journal/Segment.java 
b/src/java/org/apache/cassandra/journal/Segment.java
index 6700cb1445..e548c52128 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -58,12 +58,14 @@ abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K, V>>
 
     boolean readFirst(K id, RecordConsumer<K> consumer)
     {
-        int offset = index().lookUpFirst(id);
-        if (offset == -1)
+        long offsetAndSize = index().lookUpFirst(id);
+        if (offsetAndSize == -1)
             return false;
 
         EntrySerializer.EntryHolder<K> into = new 
EntrySerializer.EntryHolder<>();
-        if (read(offset, into))
+        int offset = Index.readOffset(offsetAndSize);
+        int size = Index.readSize(offset);
+        if (read(offset, size, into))
         {
             Invariants.checkState(id.equals(into.key), "Index for %s read 
incorrect key: expected %s but read %s", descriptor, id, into.key);
             consumer.accept(descriptor.timestamp, offset, id, into.value, 
into.hosts, descriptor.userVersion);
@@ -74,8 +76,8 @@ abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K, V>>
 
     boolean readFirst(K id, EntrySerializer.EntryHolder<K> into)
     {
-        int offset = index().lookUpFirst(id);
-        if (offset == -1 || !read(offset, into))
+        long offsetAndSize = index().lookUpFirst(id);
+        if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize), 
Index.readSize(offsetAndSize), into))
             return false;
         Invariants.checkState(id.equals(into.key), "Index for %s read 
incorrect key: expected %s but read %s", descriptor, id, into.key);
         return true;
@@ -83,14 +85,16 @@ abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K, V>>
 
     void readAll(K id, EntrySerializer.EntryHolder<K> into, Runnable onEntry)
     {
-        int[] all = index().lookUpAll(id);
+        long[] all = index().lookUpAll(id);
 
         for (int i = 0; i < all.length; i++)
         {
-            Invariants.checkState(read(all[i], into), "Read should always 
return true");
+            int offset = Index.readOffset(all[i]);
+            int size = Index.readSize(all[i]);
+            Invariants.checkState(read(offset, size, into), "Read should 
always return true");
             onEntry.run();
         }
     }
 
-    abstract boolean read(int offset, EntrySerializer.EntryHolder<K> into);
+    abstract boolean read(int offset, int size, EntrySerializer.EntryHolder<K> 
into);
 }
diff --git a/src/java/org/apache/cassandra/journal/SegmentWriter.java 
b/src/java/org/apache/cassandra/journal/SegmentWriter.java
index b8436aed66..09797c363e 100644
--- a/src/java/org/apache/cassandra/journal/SegmentWriter.java
+++ b/src/java/org/apache/cassandra/journal/SegmentWriter.java
@@ -70,10 +70,9 @@ final class SegmentWriter<K> implements Closeable
         int position = position();
         try
         {
-            index.update(key, position);
-            metadata.update(hosts);
-
             EntrySerializer.write(key, record, hosts, keySupport, trackedOut, 
descriptor.userVersion);
+            index.update(key, position, position() - position);
+            metadata.update(hosts);
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java 
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index a25fab0483..f63701771c 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -205,9 +205,9 @@ final class StaticSegment<K, V> extends Segment<K, V>
      * Expects the record to have been written at this offset, but potentially 
not flushed and lost.
      */
     @Override
-    boolean read(int offset, EntrySerializer.EntryHolder<K> into)
+    boolean read(int offset, int size, EntrySerializer.EntryHolder<K> into)
     {
-        ByteBuffer duplicate = buffer.duplicate().position(offset);
+        ByteBuffer duplicate = 
buffer.duplicate().position(offset).limit(offset + size);
         try (DataInputBuffer in = new DataInputBuffer(duplicate, false))
         {
             return EntrySerializer.tryRead(into, keySupport, duplicate, in, 
syncedOffsets.syncedOffset(), descriptor.userVersion);
diff --git a/test/unit/org/apache/cassandra/journal/IndexTest.java 
b/test/unit/org/apache/cassandra/journal/IndexTest.java
index 8301ea988e..5e7314a338 100644
--- a/test/unit/org/apache/cassandra/journal/IndexTest.java
+++ b/test/unit/org/apache/cassandra/journal/IndexTest.java
@@ -19,21 +19,28 @@ package org.apache.cassandra.journal;
 
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.utils.Generators;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.TimeUUID;
 import org.quicktheories.core.Gen;
 import org.quicktheories.impl.Constraint;
 
+import static org.apache.cassandra.journal.Index.composeOffsetAndSize;
 import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertArrayEquals;
@@ -44,7 +51,7 @@ import static org.quicktheories.QuickTheory.qt;
 
 public class IndexTest
 {
-    private static final int[] EMPTY = new int[0];
+    private static final long[] EMPTY = new long[0];
 
     @Test
     public void testInMemoryIndexBasics()
@@ -70,17 +77,18 @@ public class IndexTest
         int val32 = 3200;
         int val33 = 3300;
 
-        index.update(key1, val11);
-        index.update(key2, val21);
-        index.update(key2, val22);
-        index.update(key3, val31);
-        index.update(key3, val32);
-        index.update(key3, val33);
+        index.update(key1, val11, 1);
+        index.update(key2, val21, 2);
+        index.update(key2, val22, 3);
+        index.update(key3, val31, 4);
+        index.update(key3, val32, 5);
+        index.update(key3, val33, 6);
 
         assertArrayEquals(EMPTY, index.lookUp(key0));
-        assertArrayEquals(new int[] { val11 }, index.lookUp(key1));
-        assertArrayEquals(new int[] { val21, val22 }, index.lookUp(key2));
-        assertArrayEquals(new int[] { val31, val32, val33 }, 
index.lookUp(key3));
+
+        assertArrayEquals(new long[] { composeOffsetAndSize(val11, 1) }, 
index.lookUp(key1));
+        assertArrayEquals(new long[] { composeOffsetAndSize(val21, 2), 
composeOffsetAndSize(val22, 3) }, index.lookUp(key2));
+        assertArrayEquals(new long[] { composeOffsetAndSize(val31, 4), 
composeOffsetAndSize(val32, 5), composeOffsetAndSize(val33, 6) }, 
index.lookUp(key3));
         assertArrayEquals(EMPTY, index.lookUp(key4));
 
         assertEquals(key1, index.firstId());
@@ -111,12 +119,12 @@ public class IndexTest
         int val32 = 3200;
         int val33 = 3300;
 
-        inMemory.update(key1, val11);
-        inMemory.update(key2, val21);
-        inMemory.update(key2, val22);
-        inMemory.update(key3, val31);
-        inMemory.update(key3, val32);
-        inMemory.update(key3, val33);
+        inMemory.update(key1, val11, 1);
+        inMemory.update(key2, val21, 2);
+        inMemory.update(key2, val22, 3);
+        inMemory.update(key3, val31, 4);
+        inMemory.update(key3, val32, 5);
+        inMemory.update(key3, val33, 6);
 
         File directory = new File(Files.createTempDirectory(null));
         directory.deleteOnExit();
@@ -126,9 +134,9 @@ public class IndexTest
         try (OnDiskIndex<TimeUUID> onDisk = OnDiskIndex.open(descriptor, 
TimeUUIDKeySupport.INSTANCE))
         {
             assertArrayEquals(EMPTY, onDisk.lookUp(key0));
-            assertArrayEquals(new int[] { val11 }, onDisk.lookUp(key1));
-            assertArrayEquals(new int[] { val21, val22 }, onDisk.lookUp(key2));
-            assertArrayEquals(new int[] { val31, val32, val33 }, 
onDisk.lookUp(key3));
+            assertArrayEquals(new long[] { composeOffsetAndSize(val11, 1) }, 
onDisk.lookUp(key1));
+            assertArrayEquals(new long[] { composeOffsetAndSize(val21, 2), 
composeOffsetAndSize(val22, 3) }, onDisk.lookUp(key2));
+            assertArrayEquals(new long[] { composeOffsetAndSize(val31, 4), 
composeOffsetAndSize(val32, 5), composeOffsetAndSize(val33, 6) }, 
onDisk.lookUp(key3));
             assertArrayEquals(EMPTY, onDisk.lookUp(key4));
 
             assertEquals(key1, onDisk.firstId());
@@ -149,34 +157,34 @@ public class IndexTest
         Constraint valueSizeConstraint = Constraint.between(0, 10);
         Constraint positionConstraint = Constraint.between(0, 
Integer.MAX_VALUE);
         Gen<TimeUUID> keyGen = Generators.timeUUID();
-        Gen<int[]> valueGen = rs -> {
-            int[] array = new int[(int) rs.next(valueSizeConstraint)];
+        Gen<long[]> valueGen = rs -> {
+            long[] array = new long[(int) rs.next(valueSizeConstraint)];
             IntHashSet uniq = new IntHashSet();
             for (int i = 0; i < array.length; i++)
             {
-                int value = (int) rs.next(positionConstraint);
-                while (!uniq.add(value))
-                    value = (int) rs.next(positionConstraint);
-                array[i] = value;
+                int offset = (int) rs.next(positionConstraint);
+                while (!uniq.add(offset))
+                    offset = (int) rs.next(positionConstraint);
+                array[i] = Index.composeOffsetAndSize(offset, (int) 
rs.next(positionConstraint));
             }
             return array;
         };
-        Gen<Map<TimeUUID, int[]>> gen = rs -> {
+        Gen<Map<TimeUUID, long[]>> gen = rs -> {
             int size = (int) rs.next(sizeConstraint);
-            Map<TimeUUID, int[]> map = Maps.newHashMapWithExpectedSize(size);
+            Map<TimeUUID, long[]> map = Maps.newHashMapWithExpectedSize(size);
             for (int i = 0; i < size; i++)
             {
                 TimeUUID key = keyGen.generate(rs);
                 while (map.containsKey(key))
                     key = keyGen.generate(rs);
-                int[] value = valueGen.generate(rs);
+                long[] value = valueGen.generate(rs);
                 map.put(key, value);
             }
             return map;
         };
         gen = gen.describedAs(map -> {
             StringBuilder sb = new StringBuilder();
-            for (Map.Entry<TimeUUID, int[]> entry : map.entrySet())
+            for (Map.Entry<TimeUUID, long[]> entry : map.entrySet())
                 
sb.append('\n').append(entry.getKey()).append('\t').append(Arrays.toString(entry.getValue()));
             return sb.toString();
         });
@@ -185,19 +193,19 @@ public class IndexTest
         qt().forAll(gen).checkAssert(map -> test(directory, map));
     }
 
-    private static void test(File directory, Map<TimeUUID, int[]> map)
+    private static void test(File directory, Map<TimeUUID, long[]> map)
     {
         InMemoryIndex<TimeUUID> inMemory = 
InMemoryIndex.create(TimeUUIDKeySupport.INSTANCE);
-        for (Map.Entry<TimeUUID, int[]> e : map.entrySet())
+        for (Map.Entry<TimeUUID, long[]> e : map.entrySet())
         {
             TimeUUID key = e.getKey();
             assertThat(inMemory.lookUp(key)).isEmpty();
 
-            int[] value = e.getValue();
+            long[] value = e.getValue();
             if (value.length == 0)
                 continue;
-            for (int i : value)
-                inMemory.update(key, i);
+            for (long i : value)
+                inMemory.update(key, Index.readOffset(i), Index.readSize(i));
             Arrays.sort(value);
         }
         assertIndex(map, inMemory);
@@ -208,10 +216,29 @@ public class IndexTest
         try (OnDiskIndex<TimeUUID> onDisk = OnDiskIndex.open(descriptor, 
TimeUUIDKeySupport.INSTANCE))
         {
             assertIndex(map, onDisk);
+
+            List<Pair<TimeUUID, Long>> sortedEntries = new ArrayList<>();
+            for (Map.Entry<TimeUUID, long[]> entry : new 
TreeMap<>(map).entrySet())
+            {
+                for (long l : entry.getValue())
+                    sortedEntries.add(Pair.create(entry.getKey(), l));
+            }
+
+            Index.IndexIterator<TimeUUID> iter = onDisk.iterator();
+            Iterator<Pair<TimeUUID, Long>> expectedIter = 
sortedEntries.iterator();
+            while (iter.hasNext())
+            {
+                iter.next();
+                Pair<TimeUUID, Long> expected = expectedIter.next();
+                Assert.assertEquals(iter.currentKey(), expected.left);
+                Assert.assertEquals(iter.currentSize(), 
Index.readSize(expected.right));
+                Assert.assertEquals(iter.currentOffset(), 
Index.readOffset(expected.right));
+            }
         }
     }
 
-    private static void assertIndex(Map<TimeUUID, int[]> expected, 
Index<TimeUUID> actual)
+
+    private static void assertIndex(Map<TimeUUID, long[]> expected, 
Index<TimeUUID> actual)
     {
         List<TimeUUID> keys = expected.entrySet()
                                       .stream()
@@ -231,11 +258,11 @@ public class IndexTest
             assertThat(actual.lastId()).describedAs("Index %s had wrong 
lastId", actual).isEqualTo(keys.get(keys.size() - 1));
         }
 
-        for (Map.Entry<TimeUUID, int[]> e : expected.entrySet())
+        for (Map.Entry<TimeUUID, long[]> e : expected.entrySet())
         {
             TimeUUID key = e.getKey();
-            int[] value = e.getValue();
-            int[] read = actual.lookUp(key);
+            long[] value = e.getValue();
+            long[] read = actual.lookUp(key);
 
             if (value.length == 0)
             {
@@ -248,4 +275,21 @@ public class IndexTest
             }
         }
     }
+
+    @Test
+    public void testHelperMethods()
+    {
+        Random r = new Random();
+        for (int i = 0; i < 1000000; i++)
+        {
+            long record = 0;
+            int size = Math.abs(r.nextInt());
+            record = Index.writeSize(record, size);
+            int offset = Math.abs(r.nextInt());
+            record = Index.writeOffset(record, offset);
+            assertEquals(size, Index.readSize(record));
+            assertEquals(offset, Index.readOffset(record));
+            assertEquals(record, composeOffsetAndSize(offset, size));
+        }
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to