This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new a2d508a8fb2 HBASE-28942 Purge all netty 3 dependencies by default
(#6406)
a2d508a8fb2 is described below
commit a2d508a8fb21815444354ea2a95bee86a6b6f64a
Author: Duo Zhang <[email protected]>
AuthorDate: Wed Oct 30 22:04:35 2024 +0800
HBASE-28942 Purge all netty 3 dependencies by default (#6406)
Signed-off-by: Nick Dimiduk <[email protected]>
---
hbase-external-blockcache/pom.xml | 11 +-
.../hadoop/hbase/io/hfile/MemcachedBlockCache.java | 8 +-
.../hbase/io/hfile/TestMemcachedBlockCache.java | 155 ++++++++++++---------
pom.xml | 2 +-
4 files changed, 103 insertions(+), 73 deletions(-)
diff --git a/hbase-external-blockcache/pom.xml
b/hbase-external-blockcache/pom.xml
index 2a1e62a185a..1ea454d8c07 100644
--- a/hbase-external-blockcache/pom.xml
+++ b/hbase-external-blockcache/pom.xml
@@ -91,12 +91,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-zookeeper</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
@@ -104,9 +98,8 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.thimbleware.jmemcached</groupId>
- <artifactId>jmemcached-core</artifactId>
- <version>1.0.0</version>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git
a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
index 2e8811e93a7..efb8b97bccb 100644
---
a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
+++
b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.spy.memcached.CachedData;
+import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient;
@@ -122,11 +123,16 @@ public class MemcachedBlockCache implements BlockCache {
serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
}
- client = new MemcachedClient(builder.build(), serverAddresses);
+ client = createMemcachedClient(builder.build(), serverAddresses);
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
STAT_THREAD_PERIOD,
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
}
+ protected MemcachedClient createMemcachedClient(ConnectionFactory factory,
+ List<InetSocketAddress> serverAddresses) throws IOException {
+ return new MemcachedClient(factory, serverAddresses);
+ }
+
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean
inMemory) {
cacheBlock(cacheKey, buf);
diff --git
a/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java
b/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java
index 1e0cdf78c4d..96ff85e8414 100644
---
a/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java
+++
b/hbase-external-blockcache/src/test/java/org/apache/hadoop/hbase/io/hfile/TestMemcachedBlockCache.java
@@ -18,26 +18,39 @@
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-import com.thimbleware.jmemcached.CacheElement;
-import com.thimbleware.jmemcached.CacheImpl;
-import com.thimbleware.jmemcached.Key;
-import com.thimbleware.jmemcached.LocalCacheElement;
-import com.thimbleware.jmemcached.MemCacheDaemon;
-import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap;
-import
com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap.EvictionPolicy;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadLocalRandom;
+import net.spy.memcached.CachedData;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.OperationFuture;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.ops.OperationState;
+import net.spy.memcached.ops.OperationStatus;
+import net.spy.memcached.transcoders.Transcoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -49,76 +62,94 @@ public class TestMemcachedBlockCache {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMemcachedBlockCache.class);
- static MemCacheDaemon<? extends CacheElement> MEMCACHED;
- static MemcachedBlockCache CACHE;
+ private MemcachedBlockCache cache;
- @Before
- public void before() throws Exception {
- MEMCACHED.getCache().flush_all();
- assertEquals("Memcache is not empty",
MEMCACHED.getCache().getCurrentItems(), 0);
- }
+ private ConcurrentMap<String, CachedData> backingMap;
- @BeforeClass
- public static void setup() throws Exception {
- int port = HBaseTestingUtil.randomFreePort();
- MEMCACHED = createDaemon(port);
+ @Before
+ public void setup() throws Exception {
+ int port = ThreadLocalRandom.current().nextInt(1024, 65536);
Configuration conf = new Configuration();
conf.set("hbase.cache.memcached.servers", "localhost:" + port);
- CACHE = new MemcachedBlockCache(conf);
- }
+ backingMap = new ConcurrentHashMap<>();
+ cache = new MemcachedBlockCache(conf) {
- @AfterClass
- public static void tearDown() throws Exception {
- if (MEMCACHED != null) {
- MEMCACHED.stop();
- }
+ private <T> OperationFuture<T> createFuture(String key, long opTimeout,
T result) {
+ OperationFuture<T> future =
+ new OperationFuture<>(key, new CountDownLatch(0), opTimeout,
ForkJoinPool.commonPool());
+ Operation op = mock(Operation.class);
+ when(op.getState()).thenReturn(OperationState.COMPLETE);
+ future.setOperation(op);
+ future.set(result, new OperationStatus(true, ""));
+
+ return future;
+ }
+
+ @Override
+ protected MemcachedClient createMemcachedClient(ConnectionFactory
factory,
+ List<InetSocketAddress> serverAddresses) throws IOException {
+ assertEquals(FailureMode.Redistribute, factory.getFailureMode());
+ assertTrue(factory.isDaemon());
+ assertFalse(factory.useNagleAlgorithm());
+ assertEquals(MAX_SIZE, factory.getReadBufSize());
+ assertEquals(1, serverAddresses.size());
+ assertEquals("localhost", serverAddresses.get(0).getHostName());
+ assertEquals(port, serverAddresses.get(0).getPort());
+ MemcachedClient client = mock(MemcachedClient.class);
+ when(client.set(anyString(), anyInt(), any(), any())).then(inv -> {
+ String key = inv.getArgument(0);
+ HFileBlock block = inv.getArgument(2);
+ Transcoder<HFileBlock> tc = inv.getArgument(3);
+ CachedData cd = tc.encode(block);
+ backingMap.put(key, cd);
+ return createFuture(key, factory.getOperationTimeout(), true);
+ });
+ when(client.delete(anyString())).then(inv -> {
+ String key = inv.getArgument(0);
+ backingMap.remove(key);
+ return createFuture(key, factory.getOperationTimeout(), true);
+ });
+ when(client.get(anyString(), any())).then(inv -> {
+ String key = inv.getArgument(0);
+ Transcoder<HFileBlock> tc = inv.getArgument(1);
+ CachedData cd = backingMap.get(key);
+ return tc.decode(cd);
+ });
+ return client;
+ }
+ };
}
@Test
public void testCache() throws Exception {
- final int NUM_BLOCKS = 10;
+ final int numBlocks = 10;
HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE,
NUM_BLOCKS);
- for (int i = 0; i < NUM_BLOCKS; i++) {
- CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
+ CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE,
numBlocks);
+ for (int i = 0; i < numBlocks; i++) {
+ cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
+ }
+ Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() ==
numBlocks);
+ for (int i = 0; i < numBlocks; i++) {
+ HFileBlock actual = (HFileBlock)
cache.getBlock(blocks[i].getBlockName(), false, false, true);
+ HFileBlock expected = blocks[i].getBlock();
+ assertEquals(expected.getBlockType(), actual.getBlockType());
+ assertEquals(expected.getSerializedLength(),
actual.getSerializedLength());
}
- Waiter.waitFor(new Configuration(), 10000,
- () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS);
}
@Test
public void testEviction() throws Exception {
- final int NUM_BLOCKS = 10;
+ final int numBlocks = 10;
HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE,
NUM_BLOCKS);
- for (int i = 0; i < NUM_BLOCKS; i++) {
- CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
- }
- Waiter.waitFor(new Configuration(), 10000,
- () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS);
- for (int i = 0; i < NUM_BLOCKS; i++) {
- CACHE.evictBlock(blocks[i].getBlockName());
+ CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE,
numBlocks);
+ for (int i = 0; i < numBlocks; i++) {
+ cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
}
- Waiter.waitFor(new Configuration(), 10000, () ->
MEMCACHED.getCache().getCurrentItems() == 0);
- }
-
- private static MemCacheDaemon<? extends CacheElement> createDaemon(int port)
{
- InetSocketAddress addr = new InetSocketAddress("localhost", port);
- MemCacheDaemon<LocalCacheElement> daemon = new
MemCacheDaemon<LocalCacheElement>();
- ConcurrentLinkedHashMap<Key, LocalCacheElement> cacheStorage =
- ConcurrentLinkedHashMap.create(EvictionPolicy.LRU, 1000, 1024 * 1024);
- daemon.setCache(new CacheImpl(cacheStorage));
- daemon.setAddr(addr);
- daemon.setVerbose(true);
- daemon.start();
- while (!daemon.isRunning()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() ==
numBlocks);
+ for (int i = 0; i < numBlocks; i++) {
+ cache.evictBlock(blocks[i].getBlockName());
}
- return daemon;
+ Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == 0);
}
}
diff --git a/pom.xml b/pom.xml
index d3c27484e18..d22b85890a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -889,7 +889,7 @@
<!--Make sure these joni/jcodings are compatible with the versions used by
jruby-->
<joni.version>2.2.1</joni.version>
<jcodings.version>1.0.58</jcodings.version>
- <spy.version>2.12.2</spy.version>
+ <spy.version>2.12.3</spy.version>
<bouncycastle.version>1.78</bouncycastle.version>
<skyscreamer.version>1.5.1</skyscreamer.version>
<kerby.version>1.0.1</kerby.version>