This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 271e6fa DbLedgerStorage -- Write cache
271e6fa is described below
commit 271e6faa3ace16c1ca52e55caac3d76d26d1f6ab
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Dec 13 16:47:17 2017 +0800
DbLedgerStorage -- Write cache
1st part of DbLedgerStorage related changes.
This PR introduces a `WriteCache` class that is used to store entries
before they get flushed on the entryLogs.
The key part is that it provides a way to iterate in order over the entries
and it is is garbage-free at steady state.
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #828 from merlimat/db-ledger-storage-itemized
---
.../bookie/storage/ldb/ArrayGroupSort.java | 100 +++++++
.../bookkeeper/bookie/storage/ldb/WriteCache.java | 303 +++++++++++++++++++++
.../bookie/storage/ldb/package-info.java | 25 ++
.../bookie/storage/ldb/ArraySortGroupTest.java | 132 +++++++++
.../bookie/storage/ldb/WriteCacheTest.java | 261 ++++++++++++++++++
5 files changed, 821 insertions(+)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
new file mode 100644
index 0000000..719b33d
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Sort an array of longs, grouping the items in tuples.
+ *
+ * <p>Group size decides how many longs are included in the tuples and key
size controls how many items to use for
+ * comparison.
+ */
+public class ArrayGroupSort {
+
+ private final int keySize;
+ private final int groupSize;
+
+ public ArrayGroupSort(int keySize, int groupSize) {
+ checkArgument(keySize > 0);
+ checkArgument(groupSize > 0);
+ checkArgument(keySize <= groupSize, "keySize need to be less or equal
the groupSize");
+ this.keySize = keySize;
+ this.groupSize = groupSize;
+ }
+
+ public void sort(long[] array) {
+ sort(array, 0, array.length);
+ }
+
+ public void sort(long[] array, int offset, int length) {
+ checkArgument(length % groupSize == 0, "Array length must be multiple
of groupSize");
+ quickSort(array, offset, (length + offset - groupSize));
+ }
+
+ ////// Private
+
+ private void quickSort(long array[], int low, int high) {
+ if (low < high) {
+ int pivotIdx = partition(array, low, high);
+ quickSort(array, low, pivotIdx - groupSize);
+ quickSort(array, pivotIdx + groupSize, high);
+ }
+ }
+
+ private int partition(long array[], int low, int high) {
+ int pivotIdx = high;
+ int i = low;
+
+ for (int j = low; j < high; j += groupSize) {
+ if (isLess(array, j, pivotIdx)) {
+ swap(array, j, i);
+ i += groupSize;
+ }
+ }
+
+ swap(array, i, high);
+ return i;
+ }
+
+ private void swap(long array[], int a, int b) {
+ long tmp;
+ for (int k = 0; k < groupSize; k++) {
+ tmp = array[a + k];
+ array[a + k] = array[b + k];
+ array[b + k] = tmp;
+ }
+ }
+
+ private boolean isLess(long array[], int idx1, int idx2) {
+ for (int i = 0; i < keySize; i++) {
+ long k1 = array[idx1 + i];
+ long k2 = array[idx2 + i];
+ if (k1 < k2) {
+ return true;
+ } else if (k1 > k2) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
new file mode 100644
index 0000000..1544b6d
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -0,0 +1,303 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.bookkeeper.common.util.MathUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Write cache implementation.
+ *
+ * <p>The write cache will allocate the requested size from direct memory and
it
+ * will break it down into multiple segments.
+ *
+ * <p>The entries are appended in a common buffer and indexed though a hashmap,
+ * until the cache is cleared.
+ *
+ * <p>There is the possibility to iterate through the stored entries in an
ordered
+ * way, by (ledgerId, entry).
+ */
+public class WriteCache implements Closeable {
+
+ /**
+ * Consumer that is used to scan the entire write cache.
+ */
+ public interface EntryConsumer {
+ void accept(long ledgerId, long entryId, ByteBuf entry);
+ }
+
+ private final ConcurrentLongLongPairHashMap index =
+ new ConcurrentLongLongPairHashMap(4096, 2 *
Runtime.getRuntime().availableProcessors());
+
+ private final ConcurrentLongLongHashMap lastEntryMap =
+ new ConcurrentLongLongHashMap(4096, 2 *
Runtime.getRuntime().availableProcessors());
+
+ private final ByteBuf[] cacheSegments;
+ private final int segmentsCount;
+
+ private final long maxCacheSize;
+ private final int maxSegmentSize;
+ private final long segmentOffsetMask;
+ private final long segmentOffsetBits;
+
+ private final AtomicLong cacheSize = new AtomicLong(0);
+ private final AtomicLong cacheOffset = new AtomicLong(0);
+ private final LongAdder cacheCount = new LongAdder();
+
+ private final ConcurrentLongHashSet deletedLedgers = new
ConcurrentLongHashSet();
+
+ public WriteCache(long maxCacheSize) {
+ // Default maxSegmentSize set to 1Gb
+ this(maxCacheSize, 1 * 1024 * 1024 * 1024);
+ }
+
+ public WriteCache(long maxCacheSize, int maxSegmentSize) {
+ checkArgument(maxSegmentSize > 0);
+
+ long alignedMaxSegmentSize = alignToPowerOfTwo(maxSegmentSize);
+ checkArgument(maxSegmentSize == alignedMaxSegmentSize, "Max segment
size needs to be in form of 2^n");
+
+ this.maxCacheSize = maxCacheSize;
+ this.maxSegmentSize = (int) maxSegmentSize;
+ this.segmentOffsetMask = maxSegmentSize - 1;
+ this.segmentOffsetBits = 64 -
Long.numberOfLeadingZeros(maxSegmentSize);
+
+ this.segmentsCount = 1 + (int) (maxCacheSize / maxSegmentSize);
+
+ this.cacheSegments = new ByteBuf[segmentsCount];
+
+ for (int i = 0; i < segmentsCount - 1; i++) {
+ // All intermediate segments will be full-size
+ cacheSegments[i] = Unpooled.directBuffer(maxSegmentSize,
maxSegmentSize);
+ }
+
+ int lastSegmentSize = (int) (maxCacheSize % maxSegmentSize);
+ cacheSegments[segmentsCount - 1] =
Unpooled.directBuffer(lastSegmentSize, lastSegmentSize);
+ }
+
+ public void clear() {
+ cacheSize.set(0L);
+ cacheOffset.set(0L);
+ cacheCount.reset();
+ index.clear();
+ lastEntryMap.clear();
+ deletedLedgers.clear();
+ }
+
+ @Override
+ public void close() {
+ for (ByteBuf buf : cacheSegments) {
+ buf.release();
+ }
+ }
+
+ public boolean put(long ledgerId, long entryId, ByteBuf entry) {
+ int size = entry.readableBytes();
+
+ // Align to 64 bytes so that different threads will not contend the
same L1
+ // cache line
+ int alignedSize = align64(size);
+
+ long offset;
+ int localOffset;
+ int segmentIdx;
+
+ while (true) {
+ offset = cacheOffset.getAndAdd(alignedSize);
+ localOffset = (int) (offset & segmentOffsetMask);
+ segmentIdx = (int) (offset >>> segmentOffsetBits);
+
+ if ((offset + size) > maxCacheSize) {
+ // Cache is full
+ return false;
+ } else if (maxSegmentSize - localOffset < size) {
+ // If an entry is at the end of a segment, we need to get a
new offset and try
+ // again in next segment
+ continue;
+ } else {
+ // Found a good offset
+ break;
+ }
+ }
+
+ cacheSegments[segmentIdx].setBytes(localOffset, entry,
entry.readerIndex(), entry.readableBytes());
+
+ // Update last entryId for ledger. This logic is to handle writes for
the same
+ // ledger coming out of order and from different thread, though in
practice it
+ // should not happen and the compareAndSet should be always
uncontended.
+ while (true) {
+ long currentLastEntryId = lastEntryMap.get(ledgerId);
+ if (currentLastEntryId > entryId) {
+ // A newer entry is already there
+ break;
+ }
+
+ if (lastEntryMap.compareAndSet(ledgerId, currentLastEntryId,
entryId)) {
+ break;
+ }
+ }
+
+ index.put(ledgerId, entryId, offset, size);
+ cacheCount.increment();
+ cacheSize.addAndGet(size);
+ return true;
+ }
+
+ public ByteBuf get(long ledgerId, long entryId) {
+ LongPair result = index.get(ledgerId, entryId);
+ if (result == null) {
+ return null;
+ }
+
+ long offset = result.first;
+ int size = (int) result.second;
+ ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size, size);
+
+ int localOffset = (int) (offset & segmentOffsetMask);
+ int segmentIdx = (int) (offset >>> segmentOffsetBits);
+ entry.writeBytes(cacheSegments[segmentIdx], localOffset, size);
+ return entry;
+ }
+
+ public ByteBuf getLastEntry(long ledgerId) {
+ long lastEntryId = lastEntryMap.get(ledgerId);
+ if (lastEntryId == -1) {
+ // Ledger not found in write cache
+ return null;
+ } else {
+ return get(ledgerId, lastEntryId);
+ }
+ }
+
+ public void deleteLedger(long ledgerId) {
+ deletedLedgers.add(ledgerId);
+ }
+
+ private static final ArrayGroupSort groupSorter = new ArrayGroupSort(2, 4);
+
+ public void forEach(EntryConsumer consumer) {
+ sortedEntriesLock.lock();
+
+ try {
+ int entriesToSort = (int) index.size();
+ int arrayLen = entriesToSort * 4;
+ if (sortedEntries == null || sortedEntries.length < arrayLen) {
+ sortedEntries = new long[(int) (arrayLen * 2)];
+ }
+
+ long startTime = MathUtils.nowInNano();
+
+ sortedEntriesIdx = 0;
+ index.forEach((ledgerId, entryId, offset, length) -> {
+ if (deletedLedgers.contains(ledgerId)) {
+ // Ignore deleted ledgers
+ return;
+ }
+
+ sortedEntries[sortedEntriesIdx] = ledgerId;
+ sortedEntries[sortedEntriesIdx + 1] = entryId;
+ sortedEntries[sortedEntriesIdx + 2] = offset;
+ sortedEntries[sortedEntriesIdx + 3] = length;
+ sortedEntriesIdx += 4;
+ });
+
+ if (log.isDebugEnabled()) {
+ log.debug("iteration took {} ms",
MathUtils.elapsedNanos(startTime) / 1e6);
+ }
+ startTime = MathUtils.nowInNano();
+
+ // Sort entries by (ledgerId, entryId) maintaining the 4 items
groups
+ groupSorter.sort(sortedEntries, 0, sortedEntriesIdx);
+ if (log.isDebugEnabled()) {
+ log.debug("sorting {} ms", (MathUtils.elapsedNanos(startTime)
/ 1e6));
+ }
+ startTime = MathUtils.nowInNano();
+
+ ByteBuf[] entrySegments = new ByteBuf[segmentsCount];
+ for (int i = 0; i < segmentsCount; i++) {
+ entrySegments[i] = cacheSegments[i].slice(0,
cacheSegments[i].capacity());
+ }
+
+ for (int i = 0; i < sortedEntriesIdx; i += 4) {
+ long ledgerId = sortedEntries[i];
+ long entryId = sortedEntries[i + 1];
+ long offset = sortedEntries[i + 2];
+ long length = sortedEntries[i + 3];
+
+ int localOffset = (int) (offset & segmentOffsetMask);
+ int segmentIdx = (int) (offset >>> segmentOffsetBits);
+ ByteBuf entry = entrySegments[segmentIdx];
+ entry.setIndex(localOffset, localOffset + (int) length);
+ consumer.accept(ledgerId, entryId, entry);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("entry log adding {} ms",
MathUtils.elapsedNanos(startTime) / 1e6);
+ }
+ } finally {
+ sortedEntriesLock.unlock();
+ }
+ }
+
+ public long size() {
+ return cacheSize.get();
+ }
+
+ public long count() {
+ return cacheCount.sum();
+ }
+
+ public boolean isEmpty() {
+ return cacheSize.get() == 0L;
+ }
+
+ private static final int ALIGN_64_MASK = ~(64 - 1);
+
+ static int align64(int size) {
+ return (size + 64 - 1) & ALIGN_64_MASK;
+ }
+
+ private static long alignToPowerOfTwo(long n) {
+ return (long) Math.pow(2, 64 - Long.numberOfLeadingZeros(n - 1));
+ }
+
+ private final ReentrantLock sortedEntriesLock = new ReentrantLock();
+ private long[] sortedEntries;
+ private int sortedEntriesIdx;
+
+ private static final Logger log =
LoggerFactory.getLogger(WriteCache.class);
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
new file mode 100644
index 0000000..6c6cd8c
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
@@ -0,0 +1,25 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Classes related to DB based ledger storage.
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
\ No newline at end of file
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
new file mode 100644
index 0000000..52fdf84
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link ArrayGroupSort}.
+ */
+public class ArraySortGroupTest {
+
+ @Test
+ public void simple() {
+ long[] data = new long[] { //
+ 1, 2, 3, 4, //
+ 5, 6, 3, 1, //
+ 4, 8, 1, 2, //
+ 4, 5, 12, 10, //
+ 3, 3, 3, 3, //
+ 4, 3, 1, 2, //
+ 3, 3, 3, 3, //
+ };
+
+ long[] expectedSorted = new long[] { //
+ 1, 2, 3, 4, //
+ 3, 3, 3, 3, //
+ 3, 3, 3, 3, //
+ 4, 3, 1, 2, //
+ 4, 5, 12, 10, //
+ 4, 8, 1, 2, //
+ 5, 6, 3, 1, //
+ };
+
+ ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+ sorter.sort(data);
+
+ assertArrayEquals(expectedSorted, data);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void keySmallerThanTotalSize() {
+ new ArrayGroupSort(3, 2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void negativeKeySize() {
+ new ArrayGroupSort(-1, 2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void negativeTotalSize() {
+ new ArrayGroupSort(1, -1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void arraySizeIsNotMultiple() {
+ ArrayGroupSort sorter = new ArrayGroupSort(1, 3);
+ sorter.sort(new long[] { 1, 2, 3, 4 });
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void arraySizeIsShorterThanRequired() {
+ ArrayGroupSort sorter = new ArrayGroupSort(1, 3);
+ sorter.sort(new long[] { 1, 2 });
+ }
+
+ @Test
+ public void emptyArray() {
+ long[] data = new long[] {};
+
+ long[] expectedSorted = new long[] {};
+
+ ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+ sorter.sort(data);
+
+ assertArrayEquals(expectedSorted, data);
+ }
+
+ @Test
+ public void singleItem() {
+ long[] data = new long[] { 1, 2, 3, 4 };
+ long[] expectedSorted = new long[] { 1, 2, 3, 4 };
+
+ ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+ sorter.sort(data);
+
+ assertArrayEquals(expectedSorted, data);
+ }
+
+ @Test
+ public void twoItems() {
+ long[] data = new long[] { 1, 2, 3, 4, 1, 1, 5, 5 };
+ long[] expectedSorted = new long[] { 1, 1, 5, 5, 1, 2, 3, 4 };
+
+ ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+ sorter.sort(data);
+
+ assertArrayEquals(expectedSorted, data);
+ }
+
+ @Test
+ public void threeItems() {
+ long[] data = new long[] { 1, 2, 3, 4, 1, 1, 5, 5, 1, 0, 2, 1 };
+ long[] expectedSorted = new long[] { 1, 0, 2, 1, 1, 1, 5, 5, 1, 2, 3,
4 };
+
+ ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+ sorter.sort(data);
+
+ assertArrayEquals(expectedSorted, data);
+ }
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
new file mode 100644
index 0000000..f5b0599
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
@@ -0,0 +1,261 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link WriteCache}.
+ */
+public class WriteCacheTest {
+
+ @Test
+ public void simple() throws Exception {
+ WriteCache cache = new WriteCache(10 * 1024);
+
+ ByteBuf entry1 = PooledByteBufAllocator.DEFAULT.buffer(1024);
+ ByteBufUtil.writeUtf8(entry1, "entry-1");
+ entry1.writerIndex(entry1.capacity());
+
+ assertTrue(cache.isEmpty());
+ assertEquals(0, cache.count());
+ assertEquals(0, cache.size());
+
+ cache.put(1, 1, entry1);
+
+ assertFalse(cache.isEmpty());
+ assertEquals(1, cache.count());
+ assertEquals(entry1.readableBytes(), cache.size());
+
+ assertEquals(entry1, cache.get(1, 1));
+ assertNull(cache.get(1, 2));
+ assertNull(cache.get(2, 1));
+
+ assertEquals(entry1, cache.getLastEntry(1));
+ assertEquals(null, cache.getLastEntry(2));
+
+ cache.clear();
+
+ assertTrue(cache.isEmpty());
+ assertEquals(0, cache.count());
+ assertEquals(0, cache.size());
+
+ entry1.release();
+ cache.close();
+ }
+
+ @Test
+ public void cacheFull() throws Exception {
+ int cacheSize = 10 * 1024;
+ int entrySize = 1024;
+ int entriesCount = cacheSize / entrySize;
+
+ WriteCache cache = new WriteCache(cacheSize);
+
+ ByteBuf entry = PooledByteBufAllocator.DEFAULT.buffer(entrySize);
+ entry.writerIndex(entry.capacity());
+
+ for (int i = 0; i < entriesCount; i++) {
+ assertTrue(cache.put(1, i, entry));
+ }
+
+ assertFalse(cache.put(1, 11, entry));
+
+ assertFalse(cache.isEmpty());
+ assertEquals(entriesCount, cache.count());
+ assertEquals(cacheSize, cache.size());
+
+ AtomicInteger findCount = new AtomicInteger(0);
+ cache.forEach((ledgerId, entryId, data) -> {
+ findCount.incrementAndGet();
+ });
+
+ assertEquals(entriesCount, findCount.get());
+
+ cache.deleteLedger(1);
+
+ findCount.set(0);
+ cache.forEach((ledgerId, entryId, data) -> {
+ findCount.incrementAndGet();
+ });
+
+ assertEquals(0, findCount.get());
+
+ entry.release();
+ cache.close();
+ }
+
+ @Test
+ public void testMultipleSegments() {
+ // Create cache with max size 1Mb and each segment is 16Kb
+ WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writerIndex(entry.capacity());
+
+ for (int i = 0; i < 48; i++) {
+ cache.put(1, i, entry);
+ }
+
+ assertEquals(48, cache.count());
+ assertEquals(48 * 1024, cache.size());
+
+ cache.close();
+ }
+
+ @Test
+ public void testEmptyCache() {
+ WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+ assertEquals(0, cache.count());
+ assertEquals(0, cache.size());
+ assertTrue(cache.isEmpty());
+
+ AtomicLong foundEntries = new AtomicLong();
+ cache.forEach((ledgerId, entryId, entry) -> {
+ foundEntries.incrementAndGet();
+ });
+
+ assertEquals(0, foundEntries.get());
+ cache.close();
+ }
+
+ @Test
+ public void testMultipleWriters() throws Exception {
+ // Create cache with max size 1Mb and each segment is 16Kb
+ WriteCache cache = new WriteCache(10 * 1024 * 1024, 16 * 1024);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ int numThreads = 10;
+ int entriesPerThread = 10 * 1024 / numThreads;
+
+ CyclicBarrier barrier = new CyclicBarrier(numThreads);
+ CountDownLatch latch = new CountDownLatch(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ int ledgerId = i;
+
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ throw new RuntimeException(e);
+ }
+
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writerIndex(entry.capacity());
+
+ for (int entryId = 0; entryId < entriesPerThread; entryId++) {
+ assertTrue(cache.put(ledgerId, entryId, entry));
+ }
+
+ latch.countDown();
+ });
+ }
+
+ // Wait for all tasks to be completed
+ latch.await();
+
+ // assertEquals(numThreads * entriesPerThread, cache.count());
+ assertEquals(cache.count() * 1024, cache.size());
+
+ // Verify entries by iterating over write cache
+ AtomicLong currentLedgerId = new AtomicLong(0);
+ AtomicLong currentEntryId = new AtomicLong(0);
+
+ cache.forEach((ledgerId, entryId, entry) -> {
+ assertEquals(currentLedgerId.get(), ledgerId);
+ assertEquals(currentEntryId.get(), entryId);
+
+ if (currentEntryId.incrementAndGet() == entriesPerThread) {
+ currentLedgerId.incrementAndGet();
+ currentEntryId.set(0);
+ }
+ });
+
+ cache.close();
+ executor.shutdown();
+ }
+
+ @Test
+ public void testLedgerDeletion() {
+ WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+
+ ByteBuf entry = Unpooled.buffer(1024);
+ entry.writerIndex(entry.capacity());
+
+ for (long ledgerId = 0; ledgerId < 10; ledgerId++) {
+ for (int entryId = 0; entryId < 10; entryId++) {
+ cache.put(ledgerId, entryId, entry);
+ }
+ }
+
+ assertEquals(100, cache.count());
+ assertEquals(100 * 1024, cache.size());
+
+ cache.deleteLedger(5);
+
+ // Entries are not immediately deleted, just ignored on scan
+ assertEquals(100, cache.count());
+ assertEquals(100 * 1024, cache.size());
+
+ // Verify entries by iterating over write cache
+ AtomicLong currentLedgerId = new AtomicLong(0);
+ AtomicLong currentEntryId = new AtomicLong(0);
+
+ cache.forEach((ledgerId, entryId, e) -> {
+ assertEquals(currentLedgerId.get(), ledgerId);
+ assertEquals(currentEntryId.get(), entryId);
+
+ if (currentEntryId.incrementAndGet() == 10) {
+ currentLedgerId.incrementAndGet();
+ currentEntryId.set(0);
+
+ if (currentLedgerId.get() == 5) {
+ // Ledger 5 was deleted
+ currentLedgerId.incrementAndGet();
+ }
+ }
+ });
+
+ cache.close();
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].