This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 271e6fa  DbLedgerStorage -- Write cache
271e6fa is described below

commit 271e6faa3ace16c1ca52e55caac3d76d26d1f6ab
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Dec 13 16:47:17 2017 +0800

    DbLedgerStorage -- Write cache
    
    1st part of DbLedgerStorage related changes.
    
    This PR introduces a `WriteCache` class that is used to store entries 
before they get flushed on the entryLogs.
    
    The key part is that it provides a way to iterate in order over the entries 
and it is is garbage-free at steady state.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie 
Guo <[email protected]>
    
    This closes #828 from merlimat/db-ledger-storage-itemized
---
 .../bookie/storage/ldb/ArrayGroupSort.java         | 100 +++++++
 .../bookkeeper/bookie/storage/ldb/WriteCache.java  | 303 +++++++++++++++++++++
 .../bookie/storage/ldb/package-info.java           |  25 ++
 .../bookie/storage/ldb/ArraySortGroupTest.java     | 132 +++++++++
 .../bookie/storage/ldb/WriteCacheTest.java         | 261 ++++++++++++++++++
 5 files changed, 821 insertions(+)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
new file mode 100644
index 0000000..719b33d
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Sort an array of longs, grouping the items in tuples.
+ *
+ * <p>Group size decides how many longs are included in the tuples and key 
size controls how many items to use for
+ * comparison.
+ */
+public class ArrayGroupSort {
+
+    private final int keySize;
+    private final int groupSize;
+
+    public ArrayGroupSort(int keySize, int groupSize) {
+        checkArgument(keySize > 0);
+        checkArgument(groupSize > 0);
+        checkArgument(keySize <= groupSize, "keySize need to be less or equal 
the groupSize");
+        this.keySize = keySize;
+        this.groupSize = groupSize;
+    }
+
+    public void sort(long[] array) {
+        sort(array, 0, array.length);
+    }
+
+    public void sort(long[] array, int offset, int length) {
+        checkArgument(length % groupSize == 0, "Array length must be multiple 
of groupSize");
+        quickSort(array, offset, (length + offset - groupSize));
+    }
+
+    ////// Private
+
+    private void quickSort(long array[], int low, int high) {
+        if (low < high) {
+            int pivotIdx = partition(array, low, high);
+            quickSort(array, low, pivotIdx - groupSize);
+            quickSort(array, pivotIdx + groupSize, high);
+        }
+    }
+
+    private int partition(long array[], int low, int high) {
+        int pivotIdx = high;
+        int i = low;
+
+        for (int j = low; j < high; j += groupSize) {
+            if (isLess(array, j, pivotIdx)) {
+                swap(array, j, i);
+                i += groupSize;
+            }
+        }
+
+        swap(array, i, high);
+        return i;
+    }
+
+    private void swap(long array[], int a, int b) {
+        long tmp;
+        for (int k = 0; k < groupSize; k++) {
+            tmp = array[a + k];
+            array[a + k] = array[b + k];
+            array[b + k] = tmp;
+        }
+    }
+
+    private boolean isLess(long array[], int idx1, int idx2) {
+        for (int i = 0; i < keySize; i++) {
+            long k1 = array[idx1 + i];
+            long k2 = array[idx2 + i];
+            if (k1 < k2) {
+                return true;
+            } else if (k1 > k2) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
new file mode 100644
index 0000000..1544b6d
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -0,0 +1,303 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.bookkeeper.common.util.MathUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Write cache implementation.
+ *
+ * <p>The write cache will allocate the requested size from direct memory and 
it
+ * will break it down into multiple segments.
+ *
+ * <p>The entries are appended in a common buffer and indexed though a hashmap,
+ * until the cache is cleared.
+ *
+ * <p>There is the possibility to iterate through the stored entries in an 
ordered
+ * way, by (ledgerId, entry).
+ */
+public class WriteCache implements Closeable {
+
+    /**
+     * Consumer that is used to scan the entire write cache.
+     */
+    public interface EntryConsumer {
+        void accept(long ledgerId, long entryId, ByteBuf entry);
+    }
+
+    private final ConcurrentLongLongPairHashMap index =
+            new ConcurrentLongLongPairHashMap(4096, 2 * 
Runtime.getRuntime().availableProcessors());
+
+    private final ConcurrentLongLongHashMap lastEntryMap =
+            new ConcurrentLongLongHashMap(4096, 2 * 
Runtime.getRuntime().availableProcessors());
+
+    private final ByteBuf[] cacheSegments;
+    private final int segmentsCount;
+
+    private final long maxCacheSize;
+    private final int maxSegmentSize;
+    private final long segmentOffsetMask;
+    private final long segmentOffsetBits;
+
+    private final AtomicLong cacheSize = new AtomicLong(0);
+    private final AtomicLong cacheOffset = new AtomicLong(0);
+    private final LongAdder cacheCount = new LongAdder();
+
+    private final ConcurrentLongHashSet deletedLedgers = new 
ConcurrentLongHashSet();
+
+    public WriteCache(long maxCacheSize) {
+        // Default maxSegmentSize set to 1Gb
+        this(maxCacheSize, 1 * 1024 * 1024 * 1024);
+    }
+
+    public WriteCache(long maxCacheSize, int maxSegmentSize) {
+        checkArgument(maxSegmentSize > 0);
+
+        long alignedMaxSegmentSize = alignToPowerOfTwo(maxSegmentSize);
+        checkArgument(maxSegmentSize == alignedMaxSegmentSize, "Max segment 
size needs to be in form of 2^n");
+
+        this.maxCacheSize = maxCacheSize;
+        this.maxSegmentSize = (int) maxSegmentSize;
+        this.segmentOffsetMask = maxSegmentSize - 1;
+        this.segmentOffsetBits = 64 - 
Long.numberOfLeadingZeros(maxSegmentSize);
+
+        this.segmentsCount = 1 + (int) (maxCacheSize / maxSegmentSize);
+
+        this.cacheSegments = new ByteBuf[segmentsCount];
+
+        for (int i = 0; i < segmentsCount - 1; i++) {
+            // All intermediate segments will be full-size
+            cacheSegments[i] = Unpooled.directBuffer(maxSegmentSize, 
maxSegmentSize);
+        }
+
+        int lastSegmentSize = (int) (maxCacheSize % maxSegmentSize);
+        cacheSegments[segmentsCount - 1] = 
Unpooled.directBuffer(lastSegmentSize, lastSegmentSize);
+    }
+
+    public void clear() {
+        cacheSize.set(0L);
+        cacheOffset.set(0L);
+        cacheCount.reset();
+        index.clear();
+        lastEntryMap.clear();
+        deletedLedgers.clear();
+    }
+
+    @Override
+    public void close() {
+        for (ByteBuf buf : cacheSegments) {
+            buf.release();
+        }
+    }
+
+    public boolean put(long ledgerId, long entryId, ByteBuf entry) {
+        int size = entry.readableBytes();
+
+        // Align to 64 bytes so that different threads will not contend the 
same L1
+        // cache line
+        int alignedSize = align64(size);
+
+        long offset;
+        int localOffset;
+        int segmentIdx;
+
+        while (true) {
+            offset = cacheOffset.getAndAdd(alignedSize);
+            localOffset = (int) (offset & segmentOffsetMask);
+            segmentIdx = (int) (offset >>> segmentOffsetBits);
+
+            if ((offset + size) > maxCacheSize) {
+                // Cache is full
+                return false;
+            } else if (maxSegmentSize - localOffset < size) {
+                // If an entry is at the end of a segment, we need to get a 
new offset and try
+                // again in next segment
+                continue;
+            } else {
+                // Found a good offset
+                break;
+            }
+        }
+
+        cacheSegments[segmentIdx].setBytes(localOffset, entry, 
entry.readerIndex(), entry.readableBytes());
+
+        // Update last entryId for ledger. This logic is to handle writes for 
the same
+        // ledger coming out of order and from different thread, though in 
practice it
+        // should not happen and the compareAndSet should be always 
uncontended.
+        while (true) {
+            long currentLastEntryId = lastEntryMap.get(ledgerId);
+            if (currentLastEntryId > entryId) {
+                // A newer entry is already there
+                break;
+            }
+
+            if (lastEntryMap.compareAndSet(ledgerId, currentLastEntryId, 
entryId)) {
+                break;
+            }
+        }
+
+        index.put(ledgerId, entryId, offset, size);
+        cacheCount.increment();
+        cacheSize.addAndGet(size);
+        return true;
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        LongPair result = index.get(ledgerId, entryId);
+        if (result == null) {
+            return null;
+        }
+
+        long offset = result.first;
+        int size = (int) result.second;
+        ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size, size);
+
+        int localOffset = (int) (offset & segmentOffsetMask);
+        int segmentIdx = (int) (offset >>> segmentOffsetBits);
+        entry.writeBytes(cacheSegments[segmentIdx], localOffset, size);
+        return entry;
+    }
+
+    public ByteBuf getLastEntry(long ledgerId) {
+        long lastEntryId = lastEntryMap.get(ledgerId);
+        if (lastEntryId == -1) {
+            // Ledger not found in write cache
+            return null;
+        } else {
+            return get(ledgerId, lastEntryId);
+        }
+    }
+
+    public void deleteLedger(long ledgerId) {
+        deletedLedgers.add(ledgerId);
+    }
+
+    private static final ArrayGroupSort groupSorter = new ArrayGroupSort(2, 4);
+
+    public void forEach(EntryConsumer consumer) {
+        sortedEntriesLock.lock();
+
+        try {
+            int entriesToSort = (int) index.size();
+            int arrayLen = entriesToSort * 4;
+            if (sortedEntries == null || sortedEntries.length < arrayLen) {
+                sortedEntries = new long[(int) (arrayLen * 2)];
+            }
+
+            long startTime = MathUtils.nowInNano();
+
+            sortedEntriesIdx = 0;
+            index.forEach((ledgerId, entryId, offset, length) -> {
+                if (deletedLedgers.contains(ledgerId)) {
+                    // Ignore deleted ledgers
+                    return;
+                }
+
+                sortedEntries[sortedEntriesIdx] = ledgerId;
+                sortedEntries[sortedEntriesIdx + 1] = entryId;
+                sortedEntries[sortedEntriesIdx + 2] = offset;
+                sortedEntries[sortedEntriesIdx + 3] = length;
+                sortedEntriesIdx += 4;
+            });
+
+            if (log.isDebugEnabled()) {
+                log.debug("iteration took {} ms", 
MathUtils.elapsedNanos(startTime) / 1e6);
+            }
+            startTime = MathUtils.nowInNano();
+
+            // Sort entries by (ledgerId, entryId) maintaining the 4 items 
groups
+            groupSorter.sort(sortedEntries, 0, sortedEntriesIdx);
+            if (log.isDebugEnabled()) {
+                log.debug("sorting {} ms", (MathUtils.elapsedNanos(startTime) 
/ 1e6));
+            }
+            startTime = MathUtils.nowInNano();
+
+            ByteBuf[] entrySegments = new ByteBuf[segmentsCount];
+            for (int i = 0; i < segmentsCount; i++) {
+                entrySegments[i] = cacheSegments[i].slice(0, 
cacheSegments[i].capacity());
+            }
+
+            for (int i = 0; i < sortedEntriesIdx; i += 4) {
+                long ledgerId = sortedEntries[i];
+                long entryId = sortedEntries[i + 1];
+                long offset = sortedEntries[i + 2];
+                long length = sortedEntries[i + 3];
+
+                int localOffset = (int) (offset & segmentOffsetMask);
+                int segmentIdx = (int) (offset >>> segmentOffsetBits);
+                ByteBuf entry = entrySegments[segmentIdx];
+                entry.setIndex(localOffset, localOffset + (int) length);
+                consumer.accept(ledgerId, entryId, entry);
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("entry log adding {} ms", 
MathUtils.elapsedNanos(startTime) / 1e6);
+            }
+        } finally {
+            sortedEntriesLock.unlock();
+        }
+    }
+
+    public long size() {
+        return cacheSize.get();
+    }
+
+    public long count() {
+        return cacheCount.sum();
+    }
+
+    public boolean isEmpty() {
+        return cacheSize.get() == 0L;
+    }
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    static int align64(int size) {
+        return (size + 64 - 1) & ALIGN_64_MASK;
+    }
+
+    private static long alignToPowerOfTwo(long n) {
+        return (long) Math.pow(2, 64 - Long.numberOfLeadingZeros(n - 1));
+    }
+
+    private final ReentrantLock sortedEntriesLock = new ReentrantLock();
+    private long[] sortedEntries;
+    private int sortedEntriesIdx;
+
+    private static final Logger log = 
LoggerFactory.getLogger(WriteCache.class);
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
new file mode 100644
index 0000000..6c6cd8c
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
@@ -0,0 +1,25 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Classes related to DB based ledger storage.
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
\ No newline at end of file
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
new file mode 100644
index 0000000..52fdf84
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link ArrayGroupSort}.
+ */
+public class ArraySortGroupTest {
+
+    @Test
+    public void simple() {
+        long[] data = new long[] { //
+                1, 2, 3, 4, //
+                5, 6, 3, 1, //
+                4, 8, 1, 2, //
+                4, 5, 12, 10, //
+                3, 3, 3, 3, //
+                4, 3, 1, 2, //
+                3, 3, 3, 3, //
+        };
+
+        long[] expectedSorted = new long[] { //
+                1, 2, 3, 4, //
+                3, 3, 3, 3, //
+                3, 3, 3, 3, //
+                4, 3, 1, 2, //
+                4, 5, 12, 10, //
+                4, 8, 1, 2, //
+                5, 6, 3, 1, //
+        };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void keySmallerThanTotalSize() {
+        new ArrayGroupSort(3, 2);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void negativeKeySize() {
+        new ArrayGroupSort(-1, 2);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void negativeTotalSize() {
+        new ArrayGroupSort(1, -1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void arraySizeIsNotMultiple() {
+        ArrayGroupSort sorter = new ArrayGroupSort(1, 3);
+        sorter.sort(new long[] { 1, 2, 3, 4 });
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void arraySizeIsShorterThanRequired() {
+        ArrayGroupSort sorter = new ArrayGroupSort(1, 3);
+        sorter.sort(new long[] { 1, 2 });
+    }
+
+    @Test
+    public void emptyArray() {
+        long[] data = new long[] {};
+
+        long[] expectedSorted = new long[] {};
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void singleItem() {
+        long[] data = new long[] { 1, 2, 3, 4 };
+        long[] expectedSorted = new long[] { 1, 2, 3, 4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void twoItems() {
+        long[] data = new long[] { 1, 2, 3, 4, 1, 1, 5, 5 };
+        long[] expectedSorted = new long[] { 1, 1, 5, 5, 1, 2, 3, 4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void threeItems() {
+        long[] data = new long[] { 1, 2, 3, 4, 1, 1, 5, 5, 1, 0, 2, 1 };
+        long[] expectedSorted = new long[] { 1, 0, 2, 1, 1, 1, 5, 5, 1, 2, 3, 
4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
new file mode 100644
index 0000000..f5b0599
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
@@ -0,0 +1,261 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link WriteCache}.
+ */
+public class WriteCacheTest {
+
+    @Test
+    public void simple() throws Exception {
+        WriteCache cache = new WriteCache(10 * 1024);
+
+        ByteBuf entry1 = PooledByteBufAllocator.DEFAULT.buffer(1024);
+        ByteBufUtil.writeUtf8(entry1, "entry-1");
+        entry1.writerIndex(entry1.capacity());
+
+        assertTrue(cache.isEmpty());
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+
+        cache.put(1, 1, entry1);
+
+        assertFalse(cache.isEmpty());
+        assertEquals(1, cache.count());
+        assertEquals(entry1.readableBytes(), cache.size());
+
+        assertEquals(entry1, cache.get(1, 1));
+        assertNull(cache.get(1, 2));
+        assertNull(cache.get(2, 1));
+
+        assertEquals(entry1, cache.getLastEntry(1));
+        assertEquals(null, cache.getLastEntry(2));
+
+        cache.clear();
+
+        assertTrue(cache.isEmpty());
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+
+        entry1.release();
+        cache.close();
+    }
+
+    @Test
+    public void cacheFull() throws Exception {
+        int cacheSize = 10 * 1024;
+        int entrySize = 1024;
+        int entriesCount = cacheSize / entrySize;
+
+        WriteCache cache = new WriteCache(cacheSize);
+
+        ByteBuf entry = PooledByteBufAllocator.DEFAULT.buffer(entrySize);
+        entry.writerIndex(entry.capacity());
+
+        for (int i = 0; i < entriesCount; i++) {
+            assertTrue(cache.put(1, i, entry));
+        }
+
+        assertFalse(cache.put(1, 11, entry));
+
+        assertFalse(cache.isEmpty());
+        assertEquals(entriesCount, cache.count());
+        assertEquals(cacheSize, cache.size());
+
+        AtomicInteger findCount = new AtomicInteger(0);
+        cache.forEach((ledgerId, entryId, data) -> {
+            findCount.incrementAndGet();
+        });
+
+        assertEquals(entriesCount, findCount.get());
+
+        cache.deleteLedger(1);
+
+        findCount.set(0);
+        cache.forEach((ledgerId, entryId, data) -> {
+            findCount.incrementAndGet();
+        });
+
+        assertEquals(0, findCount.get());
+
+        entry.release();
+        cache.close();
+    }
+
+    @Test
+    public void testMultipleSegments() {
+        // Create cache with max size 1Mb and each segment is 16Kb
+        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+        ByteBuf entry = Unpooled.buffer(1024);
+        entry.writerIndex(entry.capacity());
+
+        for (int i = 0; i < 48; i++) {
+            cache.put(1, i, entry);
+        }
+
+        assertEquals(48, cache.count());
+        assertEquals(48 * 1024, cache.size());
+
+        cache.close();
+    }
+
+    @Test
+    public void testEmptyCache() {
+        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+        assertTrue(cache.isEmpty());
+
+        AtomicLong foundEntries = new AtomicLong();
+        cache.forEach((ledgerId, entryId, entry) -> {
+            foundEntries.incrementAndGet();
+        });
+
+        assertEquals(0, foundEntries.get());
+        cache.close();
+    }
+
+    @Test
+    public void testMultipleWriters() throws Exception {
+        // Create cache with max size 1Mb and each segment is 16Kb
+        WriteCache cache = new WriteCache(10 * 1024 * 1024, 16 * 1024);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        int numThreads = 10;
+        int entriesPerThread = 10 * 1024 / numThreads;
+
+        CyclicBarrier barrier = new CyclicBarrier(numThreads);
+        CountDownLatch latch = new CountDownLatch(numThreads);
+
+        for (int i = 0; i < numThreads; i++) {
+            int ledgerId = i;
+
+            executor.submit(() -> {
+                try {
+                    barrier.await();
+                } catch (InterruptedException | BrokenBarrierException e) {
+                    throw new RuntimeException(e);
+                }
+
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writerIndex(entry.capacity());
+
+                for (int entryId = 0; entryId < entriesPerThread; entryId++) {
+                    assertTrue(cache.put(ledgerId, entryId, entry));
+                }
+
+                latch.countDown();
+            });
+        }
+
+        // Wait for all tasks to be completed
+        latch.await();
+
+        // assertEquals(numThreads * entriesPerThread, cache.count());
+        assertEquals(cache.count() * 1024, cache.size());
+
+        // Verify entries by iterating over write cache
+        AtomicLong currentLedgerId = new AtomicLong(0);
+        AtomicLong currentEntryId = new AtomicLong(0);
+
+        cache.forEach((ledgerId, entryId, entry) -> {
+            assertEquals(currentLedgerId.get(), ledgerId);
+            assertEquals(currentEntryId.get(), entryId);
+
+            if (currentEntryId.incrementAndGet() == entriesPerThread) {
+                currentLedgerId.incrementAndGet();
+                currentEntryId.set(0);
+            }
+        });
+
+        cache.close();
+        executor.shutdown();
+    }
+
+    @Test
+    public void testLedgerDeletion() {
+        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+        ByteBuf entry = Unpooled.buffer(1024);
+        entry.writerIndex(entry.capacity());
+
+        for (long ledgerId = 0; ledgerId < 10; ledgerId++) {
+            for (int entryId = 0; entryId < 10; entryId++) {
+                cache.put(ledgerId, entryId, entry);
+            }
+        }
+
+        assertEquals(100, cache.count());
+        assertEquals(100 * 1024, cache.size());
+
+        cache.deleteLedger(5);
+
+        // Entries are not immediately deleted, just ignored on scan
+        assertEquals(100, cache.count());
+        assertEquals(100 * 1024, cache.size());
+
+        // Verify entries by iterating over write cache
+        AtomicLong currentLedgerId = new AtomicLong(0);
+        AtomicLong currentEntryId = new AtomicLong(0);
+
+        cache.forEach((ledgerId, entryId, e) -> {
+            assertEquals(currentLedgerId.get(), ledgerId);
+            assertEquals(currentEntryId.get(), entryId);
+
+            if (currentEntryId.incrementAndGet() == 10) {
+                currentLedgerId.incrementAndGet();
+                currentEntryId.set(0);
+
+                if (currentLedgerId.get() == 5) {
+                    // Ledger 5 was deleted
+                    currentLedgerId.incrementAndGet();
+                }
+            }
+        });
+
+        cache.close();
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to