This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new a1bdcc9eae Minor improvements to block cache code and tests (#3063)
a1bdcc9eae is described below
commit a1bdcc9eaee537223a42ccfab631e73488322637
Author: EdColeman <[email protected]>
AuthorDate: Thu Nov 3 15:12:17 2022 -0400
Minor improvements to block cache code and tests (#3063)
Incorporate some improvements borrowed from PR #1333 to
the block cache code and its tests.
---
.../core/file/blockfile/cache/lru/CachedBlock.java | 5 +-
.../file/blockfile/cache/lru/CachedBlockQueue.java | 23 ++---
.../file/blockfile/cache/TestCachedBlockQueue.java | 115 ++++++++++++++++-----
.../file/blockfile/cache/TestLruBlockCache.java | 20 ++--
4 files changed, 106 insertions(+), 57 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
index 4a4e6b0f90..1e7a11bde2 100644
---
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
+++
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
@@ -55,7 +55,7 @@ public class CachedBlock implements HeapSize,
Comparable<CachedBlock> {
MEMORY
}
- private byte[] buffer;
+ private final byte[] buffer;
private final String blockName;
private volatile long accessTime;
private volatile long recordedSize;
@@ -98,8 +98,7 @@ public class CachedBlock implements HeapSize,
Comparable<CachedBlock> {
@Override
public boolean equals(Object obj) {
- return this == obj
- || (obj != null && obj instanceof CachedBlock &&
compareTo((CachedBlock) obj) == 0);
+ return this == obj || (obj instanceof CachedBlock &&
compareTo((CachedBlock) obj) == 0);
}
@Override
diff --git
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlockQueue.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlockQueue.java
index 89565170c0..e1db00521a 100644
---
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlockQueue.java
+++
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlockQueue.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.core.file.blockfile.cache.lru;
import java.util.LinkedList;
+import java.util.Objects;
import java.util.PriorityQueue;
/**
@@ -36,10 +37,10 @@ import java.util.PriorityQueue;
*/
public class CachedBlockQueue implements HeapSize {
- private PriorityQueue<CachedBlock> queue;
+ private final PriorityQueue<CachedBlock> queue;
private long heapSize;
- private long maxSize;
+ private final long maxSize;
/**
* @param maxSize
@@ -72,7 +73,8 @@ public class CachedBlockQueue implements HeapSize {
queue.add(cb);
heapSize += cb.heapSize();
} else {
- CachedBlock head = queue.peek();
+ CachedBlock head =
+ Objects.requireNonNull(queue.peek(), "No cached blocks available
from queue");
if (cb.compareTo(head) > 0) {
heapSize += cb.heapSize();
heapSize -= head.heapSize();
@@ -96,20 +98,7 @@ public class CachedBlockQueue implements HeapSize {
while (!queue.isEmpty()) {
blocks.addFirst(queue.poll());
}
- return blocks.toArray(new CachedBlock[blocks.size()]);
- }
-
- /**
- * Get a sorted List of all elements in this queue, in descending order.
- *
- * @return list of cached elements in descending order
- */
- public LinkedList<CachedBlock> getList() {
- LinkedList<CachedBlock> blocks = new LinkedList<>();
- while (!queue.isEmpty()) {
- blocks.addFirst(queue.poll());
- }
- return blocks;
+ return blocks.toArray(new CachedBlock[0]);
}
/**
diff --git
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestCachedBlockQueue.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestCachedBlockQueue.java
index bbfa02cb66..7739c0f247 100644
---
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestCachedBlockQueue.java
+++
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestCachedBlockQueue.java
@@ -20,26 +20,77 @@ package org.apache.accumulo.core.file.blockfile.cache;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.file.blockfile.cache.lru.CachedBlockQueue;
import org.junit.jupiter.api.Test;
public class TestCachedBlockQueue {
+ @Test
+ public void testLargeBlock() {
+ CachedBlockQueue queue = new CachedBlockQueue(10000L, 1000L);
+ CachedBlock cb1 = new CachedBlock(10001L, "cb1", 1L);
+ queue.add(cb1);
+
+ List<org.apache.accumulo.core.file.blockfile.cache.lru.CachedBlock> blocks
= getList(queue);
+ assertEquals("cb1", Objects.requireNonNull(blocks.get(0)).getName());
+ }
@Test
- public void testQueue() {
+ public void testAddNewerBlock() {
+ CachedBlockQueue queue = new CachedBlockQueue(10000L, 1000L);
+
+ AtomicLong sum = new AtomicLong();
+
+ CachedBlock cb1 = new CachedBlock(5000L, "cb1", 1L);
+ cb1.recordSize(sum);
+ CachedBlock cb2 = new CachedBlock(5000, "cb2", 2L);
+ cb2.recordSize(sum);
+ CachedBlock cb3 = new CachedBlock(5000, "cb3", 3L);
+ cb3.recordSize(sum);
+
+ queue.add(cb1);
+ queue.add(cb2);
+ queue.add(cb3);
+ List<org.apache.accumulo.core.file.blockfile.cache.lru.CachedBlock> blocks
= getList(queue);
+
+ assertEquals(2, blocks.size());
+
+ long expectedSize = cb1.heapSize() + cb2.heapSize();
+ assertEquals(expectedSize, queue.heapSize());
+ assertEquals(expectedSize, sum.get() - cb3.heapSize());
+
+ assertEquals(List.of("cb1", "cb2"),
+ blocks.stream().map(cb -> cb.getName()).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testQueue() {
+ AtomicLong sum = new AtomicLong();
CachedBlock cb1 = new CachedBlock(1000, "cb1", 1);
+ cb1.recordSize(sum);
CachedBlock cb2 = new CachedBlock(1500, "cb2", 2);
+ cb2.recordSize(sum);
CachedBlock cb3 = new CachedBlock(1000, "cb3", 3);
+ cb3.recordSize(sum);
CachedBlock cb4 = new CachedBlock(1500, "cb4", 4);
+ cb4.recordSize(sum);
CachedBlock cb5 = new CachedBlock(1000, "cb5", 5);
+ cb5.recordSize(sum);
CachedBlock cb6 = new CachedBlock(1750, "cb6", 6);
+ cb6.recordSize(sum);
CachedBlock cb7 = new CachedBlock(1000, "cb7", 7);
+ cb7.recordSize(sum);
CachedBlock cb8 = new CachedBlock(1500, "cb8", 8);
+ cb8.recordSize(sum);
CachedBlock cb9 = new CachedBlock(1000, "cb9", 9);
+ cb9.recordSize(sum);
CachedBlock cb10 = new CachedBlock(1500, "cb10", 10);
+ cb10.recordSize(sum);
CachedBlockQueue queue = new CachedBlockQueue(10000, 1000);
@@ -58,34 +109,42 @@ public class TestCachedBlockQueue {
long expectedSize = cb1.heapSize() + cb2.heapSize() + cb3.heapSize() +
cb4.heapSize()
+ cb5.heapSize() + cb6.heapSize() + cb7.heapSize() + cb8.heapSize();
- assertEquals(queue.heapSize(), expectedSize);
+ assertEquals(expectedSize, queue.heapSize());
+ assertEquals(expectedSize, sum.get() - cb9.heapSize() - cb10.heapSize());
- LinkedList<org.apache.accumulo.core.file.blockfile.cache.lru.CachedBlock>
blocks =
- queue.getList();
- assertEquals(blocks.poll().getName(), "cb1");
- assertEquals(blocks.poll().getName(), "cb2");
- assertEquals(blocks.poll().getName(), "cb3");
- assertEquals(blocks.poll().getName(), "cb4");
- assertEquals(blocks.poll().getName(), "cb5");
- assertEquals(blocks.poll().getName(), "cb6");
- assertEquals(blocks.poll().getName(), "cb7");
- assertEquals(blocks.poll().getName(), "cb8");
+ List<org.apache.accumulo.core.file.blockfile.cache.lru.CachedBlock> blocks
= getList(queue);
+ assertEquals(List.of("cb1", "cb2", "cb3", "cb4", "cb5", "cb6", "cb7",
"cb8"),
+ blocks.stream().map(cb -> cb.getName()).collect(Collectors.toList()));
}
@Test
public void testQueueSmallBlockEdgeCase() {
+ AtomicLong sum = new AtomicLong();
CachedBlock cb1 = new CachedBlock(1000, "cb1", 1);
+ cb1.recordSize(sum);
CachedBlock cb2 = new CachedBlock(1500, "cb2", 2);
+ cb2.recordSize(sum);
CachedBlock cb3 = new CachedBlock(1000, "cb3", 3);
+ cb3.recordSize(sum);
CachedBlock cb4 = new CachedBlock(1500, "cb4", 4);
+ cb4.recordSize(sum);
CachedBlock cb5 = new CachedBlock(1000, "cb5", 5);
+ cb5.recordSize(sum);
CachedBlock cb6 = new CachedBlock(1750, "cb6", 6);
+ cb6.recordSize(sum);
CachedBlock cb7 = new CachedBlock(1000, "cb7", 7);
+ cb7.recordSize(sum);
CachedBlock cb8 = new CachedBlock(1500, "cb8", 8);
+ cb8.recordSize(sum);
CachedBlock cb9 = new CachedBlock(1000, "cb9", 9);
+ cb9.recordSize(sum);
CachedBlock cb10 = new CachedBlock(1500, "cb10", 10);
+ cb10.recordSize(sum);
+
+ // validate that sum was not improperly added to heapSize in recordSize
method.
+ assertEquals(cb3.heapSize(), cb7.heapSize());
CachedBlockQueue queue = new CachedBlockQueue(10000, 1000);
@@ -103,7 +162,7 @@ public class TestCachedBlockQueue {
CachedBlock cb0 = new CachedBlock(10 + CachedBlock.PER_BLOCK_OVERHEAD,
"cb0", 0);
queue.add(cb0);
- // This is older so we must include it, but it will not end up kicking
+ // This is older, so we must include it, but it will not end up kicking
// anything out because (heapSize - cb8.heapSize + cb0.heapSize < maxSize)
// and we must always maintain heapSize >= maxSize once we achieve it.
@@ -111,20 +170,13 @@ public class TestCachedBlockQueue {
long expectedSize = cb1.heapSize() + cb2.heapSize() + cb3.heapSize() +
cb4.heapSize()
+ cb5.heapSize() + cb6.heapSize() + cb7.heapSize() + cb8.heapSize() +
cb0.heapSize();
- assertEquals(queue.heapSize(), expectedSize);
+ assertEquals(expectedSize, queue.heapSize());
+ assertEquals(expectedSize, sum.get() - cb9.heapSize() - cb10.heapSize());
- LinkedList<org.apache.accumulo.core.file.blockfile.cache.lru.CachedBlock>
blocks =
- queue.getList();
- assertEquals(blocks.poll().getName(), "cb0");
- assertEquals(blocks.poll().getName(), "cb1");
- assertEquals(blocks.poll().getName(), "cb2");
- assertEquals(blocks.poll().getName(), "cb3");
- assertEquals(blocks.poll().getName(), "cb4");
- assertEquals(blocks.poll().getName(), "cb5");
- assertEquals(blocks.poll().getName(), "cb6");
- assertEquals(blocks.poll().getName(), "cb7");
- assertEquals(blocks.poll().getName(), "cb8");
+ List<org.apache.accumulo.core.file.blockfile.cache.lru.CachedBlock> blocks
= getList(queue);
+ assertEquals(List.of("cb0", "cb1", "cb2", "cb3", "cb4", "cb5", "cb6",
"cb7", "cb8"),
+ blocks.stream().map(cb -> cb.getName()).collect(Collectors.toList()));
}
private static class CachedBlock
@@ -133,4 +185,15 @@ public class TestCachedBlockQueue {
super(name, new byte[(int) (heapSize - CachedBlock.PER_BLOCK_OVERHEAD)],
accessTime, false);
}
}
+
+ /**
+ * Get a sorted List of all elements in this queue, in descending order.
+ *
+ * @return list of cached elements in descending order
+ */
+ private List<org.apache.accumulo.core.file.blockfile.cache.lru.CachedBlock>
+ getList(final CachedBlockQueue queue) {
+ return List.of(queue.get());
+ }
+
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
index 93f2c371fe..b6b0ea3962 100644
---
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
+++
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.core.file.blockfile.cache;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -25,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.security.SecureRandom;
-import java.util.Arrays;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -228,7 +228,7 @@ public class TestLruBlockCache {
assertNull(cache.getBlock(blocks[0].blockName));
assertNull(cache.getBlock(blocks[1].blockName));
for (int i = 2; i < blocks.length; i++) {
-
assertTrue(Arrays.equals(cache.getBlock(blocks[i].blockName).getBuffer(),
blocks[i].buf));
+ assertArrayEquals(cache.getBlock(blocks[i].blockName).getBuffer(),
blocks[i].buf);
}
manager.stop();
}
@@ -260,7 +260,7 @@ public class TestLruBlockCache {
for (Block block : multiBlocks) {
cache.cacheBlock(block.blockName, block.buf);
expectedCacheSize += block.heapSize();
- assertTrue(Arrays.equals(cache.getBlock(block.blockName).getBuffer(),
block.buf));
+ assertArrayEquals(cache.getBlock(block.blockName).getBuffer(),
block.buf);
}
// Add the single blocks (no get)
@@ -295,10 +295,8 @@ public class TestLruBlockCache {
// And all others to be cached
for (int i = 1; i < 4; i++) {
-
assertTrue(Arrays.equals(cache.getBlock(singleBlocks[i].blockName).getBuffer(),
- singleBlocks[i].buf));
- assertTrue(
- Arrays.equals(cache.getBlock(multiBlocks[i].blockName).getBuffer(),
multiBlocks[i].buf));
+ assertArrayEquals(cache.getBlock(singleBlocks[i].blockName).getBuffer(),
singleBlocks[i].buf);
+ assertArrayEquals(cache.getBlock(multiBlocks[i].blockName).getBuffer(),
multiBlocks[i].buf);
}
manager.stop();
}
@@ -517,8 +515,8 @@ public class TestLruBlockCache {
long roughBlockSize = maxSize / numBlocks;
int numEntries = (int) Math.ceil((1.2) * maxSize / roughBlockSize);
long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD +
ClassSize.CONCURRENT_HASHMAP
- + (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY)
- + (LruBlockCacheConfiguration.DEFAULT_CONCURRENCY_LEVEL
+ + ((long) numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY)
+ + ((long) LruBlockCacheConfiguration.DEFAULT_CONCURRENCY_LEVEL
* ClassSize.CONCURRENT_HASHMAP_SEGMENT);
long negateBlockSize = totalOverhead / numEntries;
negateBlockSize += CachedBlock.PER_BLOCK_OVERHEAD;
@@ -529,8 +527,8 @@ public class TestLruBlockCache {
long roughBlockSize = maxSize / numBlocks;
int numEntries = (int) Math.ceil((1.2) * maxSize / roughBlockSize);
long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD +
ClassSize.CONCURRENT_HASHMAP
- + (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY)
- + (LruBlockCacheConfiguration.DEFAULT_CONCURRENCY_LEVEL
+ + ((long) numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY)
+ + ((long) LruBlockCacheConfiguration.DEFAULT_CONCURRENCY_LEVEL
* ClassSize.CONCURRENT_HASHMAP_SEGMENT);
long negateBlockSize = totalOverhead / numEntries;
negateBlockSize += CachedBlock.PER_BLOCK_OVERHEAD;