jiazhai closed pull request #828: DbLedgerStorage -- Write cache
URL: https://github.com/apache/bookkeeper/pull/828
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
new file mode 100644
index 000000000..719b33dbc
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Sort an array of longs, grouping the items in tuples.
+ *
+ * <p>Group size decides how many longs are included in the tuples and key 
size controls how many items to use for
+ * comparison.
+ */
+public class ArrayGroupSort {
+
+    private final int keySize;
+    private final int groupSize;
+
+    public ArrayGroupSort(int keySize, int groupSize) {
+        checkArgument(keySize > 0);
+        checkArgument(groupSize > 0);
+        checkArgument(keySize <= groupSize, "keySize need to be less or equal 
the groupSize");
+        this.keySize = keySize;
+        this.groupSize = groupSize;
+    }
+
+    public void sort(long[] array) {
+        sort(array, 0, array.length);
+    }
+
+    public void sort(long[] array, int offset, int length) {
+        checkArgument(length % groupSize == 0, "Array length must be multiple 
of groupSize");
+        quickSort(array, offset, (length + offset - groupSize));
+    }
+
+    ////// Private
+
+    private void quickSort(long array[], int low, int high) {
+        if (low < high) {
+            int pivotIdx = partition(array, low, high);
+            quickSort(array, low, pivotIdx - groupSize);
+            quickSort(array, pivotIdx + groupSize, high);
+        }
+    }
+
+    private int partition(long array[], int low, int high) {
+        int pivotIdx = high;
+        int i = low;
+
+        for (int j = low; j < high; j += groupSize) {
+            if (isLess(array, j, pivotIdx)) {
+                swap(array, j, i);
+                i += groupSize;
+            }
+        }
+
+        swap(array, i, high);
+        return i;
+    }
+
+    private void swap(long array[], int a, int b) {
+        long tmp;
+        for (int k = 0; k < groupSize; k++) {
+            tmp = array[a + k];
+            array[a + k] = array[b + k];
+            array[b + k] = tmp;
+        }
+    }
+
+    private boolean isLess(long array[], int idx1, int idx2) {
+        for (int i = 0; i < keySize; i++) {
+            long k1 = array[idx1 + i];
+            long k2 = array[idx2 + i];
+            if (k1 < k2) {
+                return true;
+            } else if (k1 > k2) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
new file mode 100644
index 000000000..1544b6d64
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -0,0 +1,303 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.bookkeeper.common.util.MathUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Write cache implementation.
+ *
+ * <p>The write cache will allocate the requested size from direct memory and 
it
+ * will break it down into multiple segments.
+ *
+ * <p>The entries are appended in a common buffer and indexed though a hashmap,
+ * until the cache is cleared.
+ *
+ * <p>There is the possibility to iterate through the stored entries in an 
ordered
+ * way, by (ledgerId, entry).
+ */
+public class WriteCache implements Closeable {
+
+    /**
+     * Consumer that is used to scan the entire write cache.
+     */
+    public interface EntryConsumer {
+        void accept(long ledgerId, long entryId, ByteBuf entry);
+    }
+
+    private final ConcurrentLongLongPairHashMap index =
+            new ConcurrentLongLongPairHashMap(4096, 2 * 
Runtime.getRuntime().availableProcessors());
+
+    private final ConcurrentLongLongHashMap lastEntryMap =
+            new ConcurrentLongLongHashMap(4096, 2 * 
Runtime.getRuntime().availableProcessors());
+
+    private final ByteBuf[] cacheSegments;
+    private final int segmentsCount;
+
+    private final long maxCacheSize;
+    private final int maxSegmentSize;
+    private final long segmentOffsetMask;
+    private final long segmentOffsetBits;
+
+    private final AtomicLong cacheSize = new AtomicLong(0);
+    private final AtomicLong cacheOffset = new AtomicLong(0);
+    private final LongAdder cacheCount = new LongAdder();
+
+    private final ConcurrentLongHashSet deletedLedgers = new 
ConcurrentLongHashSet();
+
+    public WriteCache(long maxCacheSize) {
+        // Default maxSegmentSize set to 1Gb
+        this(maxCacheSize, 1 * 1024 * 1024 * 1024);
+    }
+
+    public WriteCache(long maxCacheSize, int maxSegmentSize) {
+        checkArgument(maxSegmentSize > 0);
+
+        long alignedMaxSegmentSize = alignToPowerOfTwo(maxSegmentSize);
+        checkArgument(maxSegmentSize == alignedMaxSegmentSize, "Max segment 
size needs to be in form of 2^n");
+
+        this.maxCacheSize = maxCacheSize;
+        this.maxSegmentSize = (int) maxSegmentSize;
+        this.segmentOffsetMask = maxSegmentSize - 1;
+        this.segmentOffsetBits = 64 - 
Long.numberOfLeadingZeros(maxSegmentSize);
+
+        this.segmentsCount = 1 + (int) (maxCacheSize / maxSegmentSize);
+
+        this.cacheSegments = new ByteBuf[segmentsCount];
+
+        for (int i = 0; i < segmentsCount - 1; i++) {
+            // All intermediate segments will be full-size
+            cacheSegments[i] = Unpooled.directBuffer(maxSegmentSize, 
maxSegmentSize);
+        }
+
+        int lastSegmentSize = (int) (maxCacheSize % maxSegmentSize);
+        cacheSegments[segmentsCount - 1] = 
Unpooled.directBuffer(lastSegmentSize, lastSegmentSize);
+    }
+
+    public void clear() {
+        cacheSize.set(0L);
+        cacheOffset.set(0L);
+        cacheCount.reset();
+        index.clear();
+        lastEntryMap.clear();
+        deletedLedgers.clear();
+    }
+
+    @Override
+    public void close() {
+        for (ByteBuf buf : cacheSegments) {
+            buf.release();
+        }
+    }
+
+    public boolean put(long ledgerId, long entryId, ByteBuf entry) {
+        int size = entry.readableBytes();
+
+        // Align to 64 bytes so that different threads will not contend the 
same L1
+        // cache line
+        int alignedSize = align64(size);
+
+        long offset;
+        int localOffset;
+        int segmentIdx;
+
+        while (true) {
+            offset = cacheOffset.getAndAdd(alignedSize);
+            localOffset = (int) (offset & segmentOffsetMask);
+            segmentIdx = (int) (offset >>> segmentOffsetBits);
+
+            if ((offset + size) > maxCacheSize) {
+                // Cache is full
+                return false;
+            } else if (maxSegmentSize - localOffset < size) {
+                // If an entry is at the end of a segment, we need to get a 
new offset and try
+                // again in next segment
+                continue;
+            } else {
+                // Found a good offset
+                break;
+            }
+        }
+
+        cacheSegments[segmentIdx].setBytes(localOffset, entry, 
entry.readerIndex(), entry.readableBytes());
+
+        // Update last entryId for ledger. This logic is to handle writes for 
the same
+        // ledger coming out of order and from different thread, though in 
practice it
+        // should not happen and the compareAndSet should be always 
uncontended.
+        while (true) {
+            long currentLastEntryId = lastEntryMap.get(ledgerId);
+            if (currentLastEntryId > entryId) {
+                // A newer entry is already there
+                break;
+            }
+
+            if (lastEntryMap.compareAndSet(ledgerId, currentLastEntryId, 
entryId)) {
+                break;
+            }
+        }
+
+        index.put(ledgerId, entryId, offset, size);
+        cacheCount.increment();
+        cacheSize.addAndGet(size);
+        return true;
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        LongPair result = index.get(ledgerId, entryId);
+        if (result == null) {
+            return null;
+        }
+
+        long offset = result.first;
+        int size = (int) result.second;
+        ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size, size);
+
+        int localOffset = (int) (offset & segmentOffsetMask);
+        int segmentIdx = (int) (offset >>> segmentOffsetBits);
+        entry.writeBytes(cacheSegments[segmentIdx], localOffset, size);
+        return entry;
+    }
+
+    public ByteBuf getLastEntry(long ledgerId) {
+        long lastEntryId = lastEntryMap.get(ledgerId);
+        if (lastEntryId == -1) {
+            // Ledger not found in write cache
+            return null;
+        } else {
+            return get(ledgerId, lastEntryId);
+        }
+    }
+
+    public void deleteLedger(long ledgerId) {
+        deletedLedgers.add(ledgerId);
+    }
+
+    private static final ArrayGroupSort groupSorter = new ArrayGroupSort(2, 4);
+
+    public void forEach(EntryConsumer consumer) {
+        sortedEntriesLock.lock();
+
+        try {
+            int entriesToSort = (int) index.size();
+            int arrayLen = entriesToSort * 4;
+            if (sortedEntries == null || sortedEntries.length < arrayLen) {
+                sortedEntries = new long[(int) (arrayLen * 2)];
+            }
+
+            long startTime = MathUtils.nowInNano();
+
+            sortedEntriesIdx = 0;
+            index.forEach((ledgerId, entryId, offset, length) -> {
+                if (deletedLedgers.contains(ledgerId)) {
+                    // Ignore deleted ledgers
+                    return;
+                }
+
+                sortedEntries[sortedEntriesIdx] = ledgerId;
+                sortedEntries[sortedEntriesIdx + 1] = entryId;
+                sortedEntries[sortedEntriesIdx + 2] = offset;
+                sortedEntries[sortedEntriesIdx + 3] = length;
+                sortedEntriesIdx += 4;
+            });
+
+            if (log.isDebugEnabled()) {
+                log.debug("iteration took {} ms", 
MathUtils.elapsedNanos(startTime) / 1e6);
+            }
+            startTime = MathUtils.nowInNano();
+
+            // Sort entries by (ledgerId, entryId) maintaining the 4 items 
groups
+            groupSorter.sort(sortedEntries, 0, sortedEntriesIdx);
+            if (log.isDebugEnabled()) {
+                log.debug("sorting {} ms", (MathUtils.elapsedNanos(startTime) 
/ 1e6));
+            }
+            startTime = MathUtils.nowInNano();
+
+            ByteBuf[] entrySegments = new ByteBuf[segmentsCount];
+            for (int i = 0; i < segmentsCount; i++) {
+                entrySegments[i] = cacheSegments[i].slice(0, 
cacheSegments[i].capacity());
+            }
+
+            for (int i = 0; i < sortedEntriesIdx; i += 4) {
+                long ledgerId = sortedEntries[i];
+                long entryId = sortedEntries[i + 1];
+                long offset = sortedEntries[i + 2];
+                long length = sortedEntries[i + 3];
+
+                int localOffset = (int) (offset & segmentOffsetMask);
+                int segmentIdx = (int) (offset >>> segmentOffsetBits);
+                ByteBuf entry = entrySegments[segmentIdx];
+                entry.setIndex(localOffset, localOffset + (int) length);
+                consumer.accept(ledgerId, entryId, entry);
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("entry log adding {} ms", 
MathUtils.elapsedNanos(startTime) / 1e6);
+            }
+        } finally {
+            sortedEntriesLock.unlock();
+        }
+    }
+
+    public long size() {
+        return cacheSize.get();
+    }
+
+    public long count() {
+        return cacheCount.sum();
+    }
+
+    public boolean isEmpty() {
+        return cacheSize.get() == 0L;
+    }
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    static int align64(int size) {
+        return (size + 64 - 1) & ALIGN_64_MASK;
+    }
+
+    private static long alignToPowerOfTwo(long n) {
+        return (long) Math.pow(2, 64 - Long.numberOfLeadingZeros(n - 1));
+    }
+
+    private final ReentrantLock sortedEntriesLock = new ReentrantLock();
+    private long[] sortedEntries;
+    private int sortedEntriesIdx;
+
+    private static final Logger log = 
LoggerFactory.getLogger(WriteCache.class);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
new file mode 100644
index 000000000..6c6cd8c92
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
@@ -0,0 +1,25 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Classes related to DB based ledger storage.
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
new file mode 100644
index 000000000..52fdf8480
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link ArrayGroupSort}.
+ */
+public class ArraySortGroupTest {
+
+    @Test
+    public void simple() {
+        long[] data = new long[] { //
+                1, 2, 3, 4, //
+                5, 6, 3, 1, //
+                4, 8, 1, 2, //
+                4, 5, 12, 10, //
+                3, 3, 3, 3, //
+                4, 3, 1, 2, //
+                3, 3, 3, 3, //
+        };
+
+        long[] expectedSorted = new long[] { //
+                1, 2, 3, 4, //
+                3, 3, 3, 3, //
+                3, 3, 3, 3, //
+                4, 3, 1, 2, //
+                4, 5, 12, 10, //
+                4, 8, 1, 2, //
+                5, 6, 3, 1, //
+        };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void keySmallerThanTotalSize() {
+        new ArrayGroupSort(3, 2);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void negativeKeySize() {
+        new ArrayGroupSort(-1, 2);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void negativeTotalSize() {
+        new ArrayGroupSort(1, -1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void arraySizeIsNotMultiple() {
+        ArrayGroupSort sorter = new ArrayGroupSort(1, 3);
+        sorter.sort(new long[] { 1, 2, 3, 4 });
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void arraySizeIsShorterThanRequired() {
+        ArrayGroupSort sorter = new ArrayGroupSort(1, 3);
+        sorter.sort(new long[] { 1, 2 });
+    }
+
+    @Test
+    public void emptyArray() {
+        long[] data = new long[] {};
+
+        long[] expectedSorted = new long[] {};
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void singleItem() {
+        long[] data = new long[] { 1, 2, 3, 4 };
+        long[] expectedSorted = new long[] { 1, 2, 3, 4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void twoItems() {
+        long[] data = new long[] { 1, 2, 3, 4, 1, 1, 5, 5 };
+        long[] expectedSorted = new long[] { 1, 1, 5, 5, 1, 2, 3, 4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void threeItems() {
+        long[] data = new long[] { 1, 2, 3, 4, 1, 1, 5, 5, 1, 0, 2, 1 };
+        long[] expectedSorted = new long[] { 1, 0, 2, 1, 1, 1, 5, 5, 1, 2, 3, 
4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
new file mode 100644
index 000000000..f5b0599ed
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
@@ -0,0 +1,261 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link WriteCache}.
+ */
+public class WriteCacheTest {
+
+    @Test
+    public void simple() throws Exception {
+        WriteCache cache = new WriteCache(10 * 1024);
+
+        ByteBuf entry1 = PooledByteBufAllocator.DEFAULT.buffer(1024);
+        ByteBufUtil.writeUtf8(entry1, "entry-1");
+        entry1.writerIndex(entry1.capacity());
+
+        assertTrue(cache.isEmpty());
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+
+        cache.put(1, 1, entry1);
+
+        assertFalse(cache.isEmpty());
+        assertEquals(1, cache.count());
+        assertEquals(entry1.readableBytes(), cache.size());
+
+        assertEquals(entry1, cache.get(1, 1));
+        assertNull(cache.get(1, 2));
+        assertNull(cache.get(2, 1));
+
+        assertEquals(entry1, cache.getLastEntry(1));
+        assertEquals(null, cache.getLastEntry(2));
+
+        cache.clear();
+
+        assertTrue(cache.isEmpty());
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+
+        entry1.release();
+        cache.close();
+    }
+
+    @Test
+    public void cacheFull() throws Exception {
+        int cacheSize = 10 * 1024;
+        int entrySize = 1024;
+        int entriesCount = cacheSize / entrySize;
+
+        WriteCache cache = new WriteCache(cacheSize);
+
+        ByteBuf entry = PooledByteBufAllocator.DEFAULT.buffer(entrySize);
+        entry.writerIndex(entry.capacity());
+
+        for (int i = 0; i < entriesCount; i++) {
+            assertTrue(cache.put(1, i, entry));
+        }
+
+        assertFalse(cache.put(1, 11, entry));
+
+        assertFalse(cache.isEmpty());
+        assertEquals(entriesCount, cache.count());
+        assertEquals(cacheSize, cache.size());
+
+        AtomicInteger findCount = new AtomicInteger(0);
+        cache.forEach((ledgerId, entryId, data) -> {
+            findCount.incrementAndGet();
+        });
+
+        assertEquals(entriesCount, findCount.get());
+
+        cache.deleteLedger(1);
+
+        findCount.set(0);
+        cache.forEach((ledgerId, entryId, data) -> {
+            findCount.incrementAndGet();
+        });
+
+        assertEquals(0, findCount.get());
+
+        entry.release();
+        cache.close();
+    }
+
+    @Test
+    public void testMultipleSegments() {
+        // Create cache with max size 1Mb and each segment is 16Kb
+        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+        ByteBuf entry = Unpooled.buffer(1024);
+        entry.writerIndex(entry.capacity());
+
+        for (int i = 0; i < 48; i++) {
+            cache.put(1, i, entry);
+        }
+
+        assertEquals(48, cache.count());
+        assertEquals(48 * 1024, cache.size());
+
+        cache.close();
+    }
+
+    @Test
+    public void testEmptyCache() {
+        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+        assertTrue(cache.isEmpty());
+
+        AtomicLong foundEntries = new AtomicLong();
+        cache.forEach((ledgerId, entryId, entry) -> {
+            foundEntries.incrementAndGet();
+        });
+
+        assertEquals(0, foundEntries.get());
+        cache.close();
+    }
+
+    @Test
+    public void testMultipleWriters() throws Exception {
+        // Create cache with max size 1Mb and each segment is 16Kb
+        WriteCache cache = new WriteCache(10 * 1024 * 1024, 16 * 1024);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        int numThreads = 10;
+        int entriesPerThread = 10 * 1024 / numThreads;
+
+        CyclicBarrier barrier = new CyclicBarrier(numThreads);
+        CountDownLatch latch = new CountDownLatch(numThreads);
+
+        for (int i = 0; i < numThreads; i++) {
+            int ledgerId = i;
+
+            executor.submit(() -> {
+                try {
+                    barrier.await();
+                } catch (InterruptedException | BrokenBarrierException e) {
+                    throw new RuntimeException(e);
+                }
+
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writerIndex(entry.capacity());
+
+                for (int entryId = 0; entryId < entriesPerThread; entryId++) {
+                    assertTrue(cache.put(ledgerId, entryId, entry));
+                }
+
+                latch.countDown();
+            });
+        }
+
+        // Wait for all tasks to be completed
+        latch.await();
+
+        // assertEquals(numThreads * entriesPerThread, cache.count());
+        assertEquals(cache.count() * 1024, cache.size());
+
+        // Verify entries by iterating over write cache
+        AtomicLong currentLedgerId = new AtomicLong(0);
+        AtomicLong currentEntryId = new AtomicLong(0);
+
+        cache.forEach((ledgerId, entryId, entry) -> {
+            assertEquals(currentLedgerId.get(), ledgerId);
+            assertEquals(currentEntryId.get(), entryId);
+
+            if (currentEntryId.incrementAndGet() == entriesPerThread) {
+                currentLedgerId.incrementAndGet();
+                currentEntryId.set(0);
+            }
+        });
+
+        cache.close();
+        executor.shutdown();
+    }
+
+    @Test
+    public void testLedgerDeletion() {
+        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+        ByteBuf entry = Unpooled.buffer(1024);
+        entry.writerIndex(entry.capacity());
+
+        for (long ledgerId = 0; ledgerId < 10; ledgerId++) {
+            for (int entryId = 0; entryId < 10; entryId++) {
+                cache.put(ledgerId, entryId, entry);
+            }
+        }
+
+        assertEquals(100, cache.count());
+        assertEquals(100 * 1024, cache.size());
+
+        cache.deleteLedger(5);
+
+        // Entries are not immediately deleted, just ignored on scan
+        assertEquals(100, cache.count());
+        assertEquals(100 * 1024, cache.size());
+
+        // Verify entries by iterating over write cache
+        AtomicLong currentLedgerId = new AtomicLong(0);
+        AtomicLong currentEntryId = new AtomicLong(0);
+
+        cache.forEach((ledgerId, entryId, e) -> {
+            assertEquals(currentLedgerId.get(), ledgerId);
+            assertEquals(currentEntryId.get(), entryId);
+
+            if (currentEntryId.incrementAndGet() == 10) {
+                currentLedgerId.incrementAndGet();
+                currentEntryId.set(0);
+
+                if (currentLedgerId.get() == 5) {
+                    // Ledger 5 was deleted
+                    currentLedgerId.incrementAndGet();
+                }
+            }
+        });
+
+        cache.close();
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to