This is an automated email from the ASF dual-hosted git repository. tswstarplanet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push: new 392f159 [Dubbo-6536]Add a remoting redis to adapt mono, sentinel and cluster redis; refactor redis registry to use the new module (#6614) 392f159 is described below commit 392f15952f46340130ba51b08a02536823a5880a Author: tswstarplanet <tswstarpla...@apache.org> AuthorDate: Sat Aug 22 01:13:39 2020 +0800 [Dubbo-6536]Add a remoting redis to adapt mono, sentinel and cluster redis; refactor redis registry to use the new module (#6614) add remoting-redis module, refactor RedisRegistry --- .../src/main/java/org/apache/dubbo/common/URL.java | 4 - .../java/org/apache/dubbo/common/URLBuilder.java | 3 - .../dubbo/common/constants/CommonConstants.java | 9 + dubbo-dependencies-bom/pom.xml | 2 +- dubbo-metadata/dubbo-metadata-report-redis/pom.xml | 4 - dubbo-registry/dubbo-registry-redis/pom.xml | 5 + .../apache/dubbo/registry/redis/RedisRegistry.java | 327 +++++++-------------- .../dubbo/registry/redis/RedisRegistryTest.java | 9 +- .../dubbo-remoting-redis}/pom.xml | 33 ++- .../apache/dubbo/remoting/redis/RedisClient.java | 46 +++ .../remoting/redis/jedis/ClusterRedisClient.java | 134 +++++++++ .../remoting/redis/jedis/MonoRedisClient.java | 118 ++++++++ .../remoting/redis/jedis/SentinelRedisClient.java | 121 ++++++++ .../redis/support/AbstractRedisClient.java | 94 ++++++ dubbo-remoting/pom.xml | 1 + 15 files changed, 651 insertions(+), 259 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java index 6ab3a89..c38ad58 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java @@ -199,10 +199,6 @@ class URL implements Serializable { String path, Map<String, String> parameters, Map<String, Map<String, String>> methodParameters) { - if (StringUtils.isEmpty(username) - && StringUtils.isNotEmpty(password)) { - throw new IllegalArgumentException("Invalid url, password without username!"); - } this.protocol = protocol; this.username = username; this.password = password; diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java index 20c6c60..eeef637 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java @@ -125,9 +125,6 @@ public final class URLBuilder { } public URL build() { - if (StringUtils.isEmpty(username) && StringUtils.isNotEmpty(password)) { - throw new IllegalArgumentException("Invalid url, password without username!"); - } port = port < 0 ? 0 : port; // trim the leading "/" int firstNonSlash = 0; diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java index e2cc73d..1266d92 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java @@ -74,6 +74,8 @@ public interface CommonConstants { Pattern EQUAL_SPLIT_PATTERN = Pattern.compile("\\s*[=]+\\s*"); + Pattern COLON_SPLIT_PATTERN = Pattern.compile("\\s*[:]+\\s*"); + String DEFAULT_PROXY = "javassist"; String DEFAULT_DIRECTORY = "dubbo"; @@ -349,4 +351,11 @@ public interface CommonConstants { */ String DEFAULT_SERVICE_NAME_MAPPING_PROPERTIES_PATH = "META-INF/dubbo/service-name-mapping.properties"; + String REDIS_CLIENT_KEY = "redis-client"; + + String MONO_REDIS = "mono"; + + String SENTINEL_REDIS = "sentinel"; + + String CLUSTER_REDIS = "cluster"; } diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 7a63e3c..ddae94e 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -101,7 +101,7 @@ <zookeeper_version>3.4.13</zookeeper_version> <curator_version>4.0.1</curator_version> <curator_test_version>2.12.0</curator_test_version> - <jedis_version>2.9.0</jedis_version> + <jedis_version>3.3.0</jedis_version> <consul_version>1.4.2</consul_version> <consul_process_version>2.0.0</consul_process_version> <consul_client_version>1.3.7</consul_client_version> diff --git a/dubbo-metadata/dubbo-metadata-report-redis/pom.xml b/dubbo-metadata/dubbo-metadata-report-redis/pom.xml index f42424b..62893ba 100644 --- a/dubbo-metadata/dubbo-metadata-report-redis/pom.xml +++ b/dubbo-metadata/dubbo-metadata-report-redis/pom.xml @@ -24,9 +24,6 @@ <modelVersion>4.0.0</modelVersion> <artifactId>dubbo-metadata-report-redis</artifactId> - <properties> - <jedis.version>2.9.0</jedis.version> - </properties> <dependencies> <dependency> @@ -37,7 +34,6 @@ <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> - <version>${jedis.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> diff --git a/dubbo-registry/dubbo-registry-redis/pom.xml b/dubbo-registry/dubbo-registry-redis/pom.xml index 0971cb8..63725af 100644 --- a/dubbo-registry/dubbo-registry-redis/pom.xml +++ b/dubbo-registry/dubbo-registry-redis/pom.xml @@ -36,6 +36,11 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>org.apache.dubbo</groupId> + <artifactId>dubbo-remoting-redis</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> diff --git a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java index 0938a39..dcf8752 100644 --- a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java +++ b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java @@ -18,25 +18,21 @@ package org.apache.dubbo.registry.redis; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.URLBuilder; -import org.apache.dubbo.common.constants.RemotingConstants; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.ExecutorUtil; import org.apache.dubbo.common.utils.NamedThreadFactory; -import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.support.FailbackRegistry; +import org.apache.dubbo.remoting.redis.RedisClient; +import org.apache.dubbo.remoting.redis.jedis.ClusterRedisClient; +import org.apache.dubbo.remoting.redis.jedis.MonoRedisClient; +import org.apache.dubbo.remoting.redis.jedis.SentinelRedisClient; import org.apache.dubbo.rpc.RpcException; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; -import redis.clients.jedis.JedisSentinelPool; -import redis.clients.util.Pool; import java.util.ArrayList; import java.util.Arrays; @@ -59,10 +55,12 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE; import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE; -import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; +import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_REDIS; +import static org.apache.dubbo.common.constants.CommonConstants.MONO_REDIS; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR; -import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.REDIS_CLIENT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.SENTINEL_REDIS; import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY; import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY; @@ -93,7 +91,7 @@ public class RedisRegistry extends FailbackRegistry { private final String root; - private final Map<String, Pool<Jedis>> jedisPools = new ConcurrentHashMap<>(); + private RedisClient redisClient; private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<>(); @@ -107,78 +105,18 @@ public class RedisRegistry extends FailbackRegistry { public RedisRegistry(URL url) { super(url); + String type = url.getParameter(REDIS_CLIENT_KEY, MONO_REDIS); + if (SENTINEL_REDIS.equals(type)) { + redisClient = new SentinelRedisClient(url); + } else if (CLUSTER_REDIS.equals(type)) { + redisClient = new ClusterRedisClient(url); + } else { + redisClient = new MonoRedisClient(url); + } + if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } - GenericObjectPoolConfig config = new GenericObjectPoolConfig(); - config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); - config.setTestOnReturn(url.getParameter("test.on.return", false)); - config.setTestWhileIdle(url.getParameter("test.while.idle", false)); - if (url.getParameter("max.idle", 0) > 0) { - config.setMaxIdle(url.getParameter("max.idle", 0)); - } - if (url.getParameter("min.idle", 0) > 0) { - config.setMinIdle(url.getParameter("min.idle", 0)); - } - if (url.getParameter("max.active", 0) > 0) { - config.setMaxTotal(url.getParameter("max.active", 0)); - } - if (url.getParameter("max.total", 0) > 0) { - config.setMaxTotal(url.getParameter("max.total", 0)); - } - if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) { - config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0))); - } - if (url.getParameter("num.tests.per.eviction.run", 0) > 0) { - config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0)); - } - if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) { - config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); - } - if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) { - config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); - } - - String cluster = url.getParameter("cluster", "failover"); - if (!"failover".equals(cluster) && !"replicate".equals(cluster)) { - throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate."); - } - replicate = "replicate".equals(cluster); - - List<String> addresses = new ArrayList<>(); - addresses.add(url.getAddress()); - String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]); - if (ArrayUtils.isNotEmpty(backups)) { - addresses.addAll(Arrays.asList(backups)); - } - //获得Redis主节点名称 - String masterName = url.getParameter(REDIS_MASTER_NAME_KEY); - if (StringUtils.isEmpty(masterName)) { - //单机版redis - for (String address : addresses) { - int i = address.indexOf(':'); - String host; - int port; - if (i > 0) { - host = address.substring(0, i); - port = Integer.parseInt(address.substring(i + 1)); - } else { - host = address; - port = DEFAULT_REDIS_PORT; - } - this.jedisPools.put(address, new JedisPool(config, host, port, - url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(), - url.getParameter("db.index", 0))); - } - } else { - //哨兵版redis - Set<String> sentinelSet = new HashSet<>(addresses); - int index = url.getParameter("db.index", 0); - int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); - String password = StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(); - JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinelSet, config, timeout, password, index); - this.jedisPools.put(masterName, pool); - } this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD); String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); @@ -201,37 +139,24 @@ public class RedisRegistry extends FailbackRegistry { } private void deferExpired() { - for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { - Pool<Jedis> jedisPool = entry.getValue(); - try { - try (Jedis jedis = jedisPool.getResource()) { - for (URL url : new HashSet<>(getRegistered())) { - if (url.getParameter(DYNAMIC_KEY, true)) { - String key = toCategoryPath(url); - if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { - jedis.publish(key, REGISTER); - } - } - } - if (admin) { - clean(jedis); - } - if (!replicate) { - break;// If the server side has synchronized data, just write a single machine - } + for (URL url : new HashSet<>(getRegistered())) { + if (url.getParameter(DYNAMIC_KEY, true)) { + String key = toCategoryPath(url); + if (redisClient.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { + redisClient.publish(key, REGISTER); } - } catch (Throwable t) { - logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); } } + if (admin) { + clean(); + } } - // The monitoring center is responsible for deleting outdated dirty data - private void clean(Jedis jedis) { - Set<String> keys = jedis.keys(root + ANY_VALUE); + private void clean() { + Set<String> keys = redisClient.scan(root + ANY_VALUE); if (CollectionUtils.isNotEmpty(keys)) { for (String key : keys) { - Map<String, String> values = jedis.hgetAll(key); + Map<String, String> values = redisClient.hgetAll(key); if (CollectionUtils.isNotEmptyMap(values)) { boolean delete = false; long now = System.currentTimeMillis(); @@ -240,7 +165,7 @@ public class RedisRegistry extends FailbackRegistry { if (url.getParameter(DYNAMIC_KEY, true)) { long expire = Long.parseLong(entry.getValue()); if (expire < now) { - jedis.hdel(key, entry.getKey()); + redisClient.hdel(key, entry.getKey()); delete = true; if (logger.isWarnEnabled()) { logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now)); @@ -249,7 +174,7 @@ public class RedisRegistry extends FailbackRegistry { } } if (delete) { - jedis.publish(key, UNREGISTER); + redisClient.publish(key, UNREGISTER); } } } @@ -258,15 +183,7 @@ public class RedisRegistry extends FailbackRegistry { @Override public boolean isAvailable() { - for (Pool<Jedis> jedisPool : jedisPools.values()) { - try (Jedis jedis = jedisPool.getResource()) { - if (jedis.isConnected()) { - return true; // At least one single machine is available. - } - } catch (Throwable t) { - } - } - return false; + return redisClient.isConnected(); } @Override @@ -284,13 +201,10 @@ public class RedisRegistry extends FailbackRegistry { } catch (Throwable t) { logger.warn(t.getMessage(), t); } - for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { - Pool<Jedis> jedisPool = entry.getValue(); - try { - jedisPool.destroy(); - } catch (Throwable t) { - logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); - } + try { + redisClient.destroy(); + } catch (Throwable t) { + logger.warn("Failed to destroy the redis registry client. registry: " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod); } @@ -302,21 +216,13 @@ public class RedisRegistry extends FailbackRegistry { String expire = String.valueOf(System.currentTimeMillis() + expirePeriod); boolean success = false; RpcException exception = null; - for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { - Pool<Jedis> jedisPool = entry.getValue(); - try { - try (Jedis jedis = jedisPool.getResource()) { - jedis.hset(key, value, expire); - jedis.publish(key, REGISTER); - success = true; - if (!replicate) { - break; // If the server side has synchronized data, just write a single machine - } - } - } catch (Throwable t) { - exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); - } + try { + redisClient.hset(key, value, expire); + redisClient.publish(key, REGISTER); + } catch (Throwable t) { + exception = new RpcException("Failed to register service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t); } + if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); @@ -332,21 +238,14 @@ public class RedisRegistry extends FailbackRegistry { String value = url.toFullString(); RpcException exception = null; boolean success = false; - for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { - Pool<Jedis> jedisPool = entry.getValue(); - try { - try (Jedis jedis = jedisPool.getResource()) { - jedis.hdel(key, value); - jedis.publish(key, UNREGISTER); - success = true; - if (!replicate) { - break; // If the server side has synchronized data, just write a single machine - } - } - } catch (Throwable t) { - exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); - } + try { + redisClient.hdel(key, value); + redisClient.publish(key, UNREGISTER); + success = true; + } catch (Throwable t) { + exception = new RpcException("Failed to unregister service to redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t); } + if (exception != null) { if (success) { logger.warn(exception.getMessage(), exception); @@ -370,33 +269,27 @@ public class RedisRegistry extends FailbackRegistry { } boolean success = false; RpcException exception = null; - for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { - Pool<Jedis> jedisPool = entry.getValue(); - try { - try (Jedis jedis = jedisPool.getResource()) { - if (service.endsWith(ANY_VALUE)) { - admin = true; - Set<String> keys = jedis.keys(service); - if (CollectionUtils.isNotEmpty(keys)) { - Map<String, Set<String>> serviceKeys = new HashMap<>(); - for (String key : keys) { - String serviceKey = toServicePath(key); - Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>()); - sk.add(key); - } - for (Set<String> sk : serviceKeys.values()) { - doNotify(jedis, sk, url, Collections.singletonList(listener)); - } - } - } else { - doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener)); + try { + if (service.endsWith(ANY_VALUE)) { + admin = true; + Set<String> keys = redisClient.scan(service); + if (CollectionUtils.isNotEmpty(keys)) { + Map<String, Set<String>> serviceKeys = new HashMap<>(); + for (String key : keys) { + String serviceKey = toServicePath(key); + Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>()); + sk.add(key); + } + for (Set<String> sk : serviceKeys.values()) { + doNotify(sk, url, Collections.singletonList(listener)); } - success = true; - break; // Just read one server's data } - } catch (Throwable t) { // Try the next server - exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); + } else { + doNotify(redisClient.scan(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener)); } + success = true; + } catch (Throwable t) { + exception = new RpcException("Failed to subscribe service from redis registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + t.getMessage(), t); } if (exception != null) { if (success) { @@ -411,13 +304,13 @@ public class RedisRegistry extends FailbackRegistry { public void doUnsubscribe(URL url, NotifyListener listener) { } - private void doNotify(Jedis jedis, String key) { + private void doNotify(String key) { for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) { - doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue())); + doNotify(Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue())); } } - private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) { + private void doNotify(Collection<String> keys, URL url, Collection<NotifyListener> listeners) { if (keys == null || keys.isEmpty() || listeners == null || listeners.isEmpty()) { return; @@ -438,7 +331,7 @@ public class RedisRegistry extends FailbackRegistry { continue; } List<URL> urls = new ArrayList<>(); - Map<String, String> values = jedis.hgetAll(key); + Map<String, String> values = redisClient.hgetAll(key); if (CollectionUtils.isNotEmptyMap(values)) { for (Map.Entry<String, String> entry : values.entrySet()) { URL u = URL.valueOf(entry.getKey()); @@ -500,12 +393,7 @@ public class RedisRegistry extends FailbackRegistry { } private class NotifySub extends JedisPubSub { - - private final Pool<Jedis> jedisPool; - - public NotifySub(Pool<Jedis> jedisPool) { - this.jedisPool = jedisPool; - } + public NotifySub() {} @Override public void onMessage(String key, String msg) { @@ -515,12 +403,7 @@ public class RedisRegistry extends FailbackRegistry { if (msg.equals(REGISTER) || msg.equals(UNREGISTER)) { try { - Jedis jedis = jedisPool.getResource(); - try { - doNotify(jedis, key); - } finally { - jedis.close(); - } + doNotify(key); } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee logger.error(t.getMessage(), t); } @@ -555,7 +438,7 @@ public class RedisRegistry extends FailbackRegistry { private final String service; private final AtomicInteger connectSkip = new AtomicInteger(); private final AtomicInteger connectSkipped = new AtomicInteger(); - private volatile Jedis jedis; + private volatile boolean first = true; private volatile boolean running = true; private volatile int connectRandom; @@ -595,46 +478,34 @@ public class RedisRegistry extends FailbackRegistry { try { if (!isSkip()) { try { - for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) { - Pool<Jedis> jedisPool = entry.getValue(); - try { - if (jedisPool.isClosed()) { - continue; - } - jedis = jedisPool.getResource(); - if (!jedis.isConnected()) { - continue; - } - try { - if (service.endsWith(ANY_VALUE)) { - if (first) { - first = false; - Set<String> keys = jedis.keys(service); - if (CollectionUtils.isNotEmpty(keys)) { - for (String s : keys) { - doNotify(jedis, s); - } - } - resetSkip(); - } - jedis.psubscribe(new NotifySub(jedisPool), service); // blocking - } else { - if (first) { - first = false; - doNotify(jedis, service); - resetSkip(); + if (!redisClient.isConnected()) { + continue; + } + try { + if (service.endsWith(ANY_VALUE)) { + if (first) { + first = false; + Set<String> keys = redisClient.scan(service); + if (CollectionUtils.isNotEmpty(keys)) { + for (String s : keys) { + doNotify(s); } - jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking } - break; - } finally { - jedis.close(); + resetSkip(); + } + redisClient.psubscribe(new NotifySub(), service); + } else { + if (first) { + first = false; + doNotify(service); + resetSkip(); } - } catch (Throwable t) { // Retry another server - logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); - // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources - sleep(reconnectPeriod); + redisClient.psubscribe(new NotifySub(), service + PATH_SEPARATOR + ANY_VALUE); // blocking } + } catch (Throwable t) { // Retry another server + logger.warn("Failed to subscribe service from redis registry. registry: " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); + // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources + sleep(reconnectPeriod); } } catch (Throwable t) { logger.error(t.getMessage(), t); @@ -650,7 +521,7 @@ public class RedisRegistry extends FailbackRegistry { public void shutdown() { try { running = false; - jedis.disconnect(); + redisClient.disconnect(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } diff --git a/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java b/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java index 755e452..c41fe24 100644 --- a/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java +++ b/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import redis.clients.jedis.exceptions.JedisConnectionException; import redis.embedded.RedisServer; import redis.embedded.RedisServerBuilder; @@ -36,6 +37,7 @@ import java.util.Set; import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; public class RedisRegistryTest { @@ -80,7 +82,7 @@ public class RedisRegistryTest { @Test public void testAnyHost() { - Assertions.assertThrows(IllegalStateException.class, () -> { + assertThrows(IllegalStateException.class, () -> { URL errorUrl = URL.valueOf("multicast://0.0.0.0/"); new RedisRegistryFactory().createRegistry(errorUrl); }); @@ -112,7 +114,7 @@ public class RedisRegistryTest { assertThat(redisRegistry.isAvailable(), is(true)); redisRegistry.destroy(); - assertThat(redisRegistry.isAvailable(), is(false)); + assertThrows(JedisConnectionException.class, () -> redisRegistry.isAvailable()); } @Test @@ -120,7 +122,8 @@ public class RedisRegistryTest { URL url = URL.valueOf("redis://redisOne:8880").addParameter(BACKUP_KEY, "redisTwo:8881"); Registry registry = new RedisRegistryFactory().createRegistry(url); - assertThat(registry.isAvailable(), is(false)); + Registry finalRegistry = registry; + assertThrows(JedisConnectionException.class, () -> finalRegistry.isAvailable()); url = URL.valueOf(this.registryUrl.toFullString()).addParameter(BACKUP_KEY, "redisTwo:8881"); registry = new RedisRegistryFactory().createRegistry(url); diff --git a/dubbo-registry/dubbo-registry-redis/pom.xml b/dubbo-remoting/dubbo-remoting-redis/pom.xml similarity index 72% copy from dubbo-registry/dubbo-registry-redis/pom.xml copy to dubbo-remoting/dubbo-remoting-redis/pom.xml index 0971cb8..b30f6a7 100644 --- a/dubbo-registry/dubbo-registry-redis/pom.xml +++ b/dubbo-remoting/dubbo-remoting-redis/pom.xml @@ -14,40 +14,41 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> + <artifactId>dubbo-remoting</artifactId> <groupId>org.apache.dubbo</groupId> - <artifactId>dubbo-registry</artifactId> <version>${revision}</version> <relativePath>../pom.xml</relativePath> </parent> - <artifactId>dubbo-registry-redis</artifactId> + <modelVersion>4.0.0</modelVersion> + + <artifactId>dubbo-remoting-redis</artifactId> + <packaging>jar</packaging> + <name>${project.artifactId}</name> - <description>The redis registry module of dubbo project</description> + <description>The redis remoting module of dubbo project</description> + <properties> <skip_maven_deploy>false</skip_maven_deploy> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> + <dependencies> <dependency> <groupId>org.apache.dubbo</groupId> - <artifactId>dubbo-registry-api</artifactId> + <artifactId>dubbo-remoting-api</artifactId> <version>${project.parent.version}</version> </dependency> <dependency> - <groupId>redis.clients</groupId> - <artifactId>jedis</artifactId> - </dependency> - <dependency> - <groupId>com.github.kstyrc</groupId> - <artifactId>embedded-redis</artifactId> - <scope>test</scope> + <groupId>org.apache.dubbo</groupId> + <artifactId>dubbo-common</artifactId> + <version>${project.parent.version}</version> </dependency> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <scope>test</scope> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> </dependency> </dependencies> </project> \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java new file mode 100644 index 0000000..7b2d7b8 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.redis; + +import redis.clients.jedis.JedisPubSub; + +import java.util.Map; +import java.util.Set; + +public interface RedisClient { + Long hset(String key, String field, String value); + + Long publish(String channel, String message); + +// void clean(String pattern); + + boolean isConnected(); + + void destroy(); + + Long hdel(final String key, final String... fields); + + Set<String> scan(String pattern); + + Map<String, String> hgetAll(String key); + + void psubscribe(final JedisPubSub jedisPubSub, final String... patterns); + + void disconnect(); + + void close(); +} diff --git a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java new file mode 100644 index 0000000..b4b104e --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.redis.jedis; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.remoting.redis.RedisClient; +import org.apache.dubbo.remoting.redis.support.AbstractRedisClient; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPubSub; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.dubbo.common.constants.CommonConstants.COLON_SPLIT_PATTERN; +import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; + +public class ClusterRedisClient extends AbstractRedisClient implements RedisClient { + private static final Logger logger = LoggerFactory.getLogger(ClusterRedisClient.class); + + private static final int DEFAULT_TIMEOUT = 2000; + + private static final int DEFAULT_SO_TIMEOUT = 2000; + + private static final int DEFAULT_MAX_ATTEMPTS = 5; + + private JedisCluster jedisCluster; + + public ClusterRedisClient(URL url) { + super(url); + Set<HostAndPort> nodes = getNodes(url); + jedisCluster = new JedisCluster(nodes, url.getParameter("connection.timeout", DEFAULT_TIMEOUT), + url.getParameter("so.timeout", DEFAULT_SO_TIMEOUT), url.getParameter("max.attempts", DEFAULT_MAX_ATTEMPTS), + url.getPassword(), getConfig()); + } + + @Override + public Long hset(String key, String field, String value) { + return jedisCluster.hset(key, field, value); + } + + @Override + public Long publish(String channel, String message) { + return jedisCluster.publish(channel, message); + } + + @Override + public boolean isConnected() { + Map<String, JedisPool> poolMap = jedisCluster.getClusterNodes(); + for (JedisPool jedisPool : poolMap.values()) { + Jedis jedis = jedisPool.getResource(); + if (jedis.isConnected()) { + return true; + } + } + return false; + } + + @Override + public void destroy() { + jedisCluster.close(); + } + + @Override + public Long hdel(String key, String... fields) { + return jedisCluster.hdel(key, fields); + } + + @Override + public Set<String> scan(String pattern) { + Map<String, JedisPool> nodes = jedisCluster.getClusterNodes(); + Set<String> result = new HashSet<>(); + for (JedisPool jedisPool : nodes.values()) { + Jedis jedis = jedisPool.getResource(); + result.addAll(scan(jedis, pattern)); + jedis.close(); + } + return result; + } + + @Override + public Map<String, String> hgetAll(String key) { + return jedisCluster.hgetAll(key); + } + + @Override + public void psubscribe(JedisPubSub jedisPubSub, String... patterns) { + jedisCluster.psubscribe(jedisPubSub, patterns); + } + + @Override + public void disconnect() { + jedisCluster.close(); + } + + @Override + public void close() { + jedisCluster.close(); + } + + private Set<HostAndPort> getNodes(URL url) { + Set<HostAndPort> hostAndPorts = new HashSet<>(); + hostAndPorts.add(new HostAndPort(url.getHost(), url.getPort())); + String backupAddresses = url.getBackupAddress(6379); + String[] nodes = StringUtils.isEmpty(backupAddresses) ? new String[0] : COMMA_SPLIT_PATTERN.split(backupAddresses); + if (nodes.length > 0) { + for (String node : nodes) { + String[] hostAndPort = COLON_SPLIT_PATTERN.split(node); + hostAndPorts.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1]))); + } + } + return hostAndPorts; + } +} diff --git a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java new file mode 100644 index 0000000..d69c565 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.redis.jedis; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.remoting.redis.RedisClient; +import org.apache.dubbo.remoting.redis.support.AbstractRedisClient; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPubSub; + +import java.util.Map; +import java.util.Set; + +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; +import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; + +public class MonoRedisClient extends AbstractRedisClient implements RedisClient { + private static final Logger logger = LoggerFactory.getLogger(MonoRedisClient.class); + + private static final String START_CURSOR = "0"; + + private JedisPool jedisPool; + + public MonoRedisClient(URL url) { + super(url); + jedisPool = new JedisPool(getConfig(), url.getHost(), url.getPort(), + url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), url.getPassword()); + } + + @Override + public Long hset(String key, String field, String value) { + Jedis jedis = jedisPool.getResource(); + Long result = jedis.hset(key, field, value); + jedis.close(); + return result; + } + + @Override + public Long publish(String channel, String message) { + Jedis jedis = jedisPool.getResource(); + Long result = jedis.publish(channel, message); + jedis.close(); + return result; + } + + @Override + public boolean isConnected() { + Jedis jedis = jedisPool.getResource(); + boolean connected = jedis.isConnected(); + jedis.close(); + return connected; + } + + @Override + public void destroy() { + jedisPool.close(); + } + + @Override + public Long hdel(String key, String... fields) { + Jedis jedis = jedisPool.getResource(); + Long result = jedis.hdel(key, fields); + jedis.close(); + return result; + } + + @Override + public Set<String> scan(String pattern) { + Jedis jedis = jedisPool.getResource(); + Set<String> result = super.scan(jedis, pattern); + jedis.close(); + return result; + } + + @Override + public Map<String, String> hgetAll(String key) { + Jedis jedis = jedisPool.getResource(); + Map<String, String> result = jedis.hgetAll(key); + jedis.close(); + return result; + } + + @Override + public void psubscribe(JedisPubSub jedisPubSub, String... patterns) { + Jedis jedis = jedisPool.getResource(); + jedis.psubscribe(jedisPubSub, patterns); + jedis.close(); + } + + @Override + public void disconnect() { + jedisPool.close(); + } + + @Override + public void close() { + jedisPool.close(); + } + + +} diff --git a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java new file mode 100644 index 0000000..ba3c890 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.redis.jedis; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.constants.RemotingConstants; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.remoting.redis.RedisClient; +import org.apache.dubbo.remoting.redis.support.AbstractRedisClient; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.JedisSentinelPool; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class SentinelRedisClient extends AbstractRedisClient implements RedisClient { + private static final Logger logger = LoggerFactory.getLogger(SentinelRedisClient.class); + + private JedisSentinelPool sentinelPool; + + public SentinelRedisClient(URL url) { + super(url); + String masterName = url.getParameter("master.name", "Sentinel-master"); + String address = (new StringBuilder()).append(url.getAddress()).toString(); + String[] backupAddresses = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]); + if (backupAddresses.length == 0) { + throw new IllegalStateException("Sentinel addresses can not be empty"); + } + Set<String> sentinels = new HashSet<>(Arrays.asList(backupAddresses)); + sentinels.add(address); + sentinelPool = new JedisSentinelPool(masterName, sentinels, getConfig(), url.getPassword()); + } + + @Override + public Long hset(String key, String field, String value) { + Jedis jedis = sentinelPool.getResource(); + Long result = jedis.hset(key, field, value); + jedis.close(); + return result; + } + + @Override + public Long publish(String channel, String message) { + Jedis jedis = sentinelPool.getResource(); + Long result = jedis.publish(channel, message); + jedis.close(); + return result; + } + + @Override + public boolean isConnected() { + Jedis jedis = sentinelPool.getResource(); + boolean result = jedis.isConnected(); + jedis.close(); + return result; + } + + @Override + public void destroy() { + sentinelPool.close(); + } + + @Override + public Long hdel(String key, String... fields) { + Jedis jedis = sentinelPool.getResource(); + Long result = jedis.hdel(key, fields); + jedis.close(); + return result; + } + + @Override + public Set<String> scan(String pattern) { + Jedis jedis = sentinelPool.getResource(); + Set<String> result = scan(jedis, pattern); + jedis.close(); + return result; + } + + @Override + public Map<String, String> hgetAll(String key) { + Jedis jedis = sentinelPool.getResource(); + Map<String, String> result = jedis.hgetAll(key); + jedis.close(); + return result; + } + + @Override + public void psubscribe(JedisPubSub jedisPubSub, String... patterns) { + Jedis jedis = sentinelPool.getResource(); + jedis.psubscribe(jedisPubSub, patterns); + jedis.close(); + } + + @Override + public void disconnect() { + sentinelPool.close(); + } + + @Override + public void close() { + sentinelPool.close(); + } +} diff --git a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java new file mode 100644 index 0000000..e09fd1c --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.redis.support; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.CollectionUtils; +import org.apache.dubbo.remoting.redis.RedisClient; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.ScanParams; +import redis.clients.jedis.ScanResult; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractRedisClient implements RedisClient { + private URL url; + + private JedisPoolConfig config; + + public AbstractRedisClient(URL url) { + this.url = url; + config = new JedisPoolConfig(); + config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); + config.setTestOnReturn(url.getParameter("test.on.return", false)); + config.setTestWhileIdle(url.getParameter("test.while.idle", false)); + if (url.getParameter("max.idle", 0) > 0) { + config.setMaxIdle(url.getParameter("max.idle", 0)); + } + if (url.getParameter("min.idle", 0) > 0) { + config.setMinIdle(url.getParameter("min.idle", 0)); + } + if (url.getParameter("max.active", 0) > 0) { + config.setMaxTotal(url.getParameter("max.active", 0)); + } + if (url.getParameter("max.total", 0) > 0) { + config.setMaxTotal(url.getParameter("max.total", 0)); + } + if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) { + config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0))); + } + if (url.getParameter("num.tests.per.eviction.run", 0) > 0) { + config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0)); + } + if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) { + config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); + } + if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) { + config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); + } + } + + protected Set<String> scan(Jedis jedis, String pattern) { + Set<String> result = new HashSet<>(); + String cursor = ScanParams.SCAN_POINTER_START; + ScanParams params = new ScanParams(); + params.match(pattern); + while (true) { + ScanResult<String> scanResult = jedis.scan(cursor, params); + List<String> list = scanResult.getResult(); + if (CollectionUtils.isNotEmpty(list)) { + result.addAll(list); + } + if (ScanParams.SCAN_POINTER_START.equals(scanResult.getCursor())) { + break; + } + cursor = scanResult.getCursor(); + } + return result; + } + + public URL getUrl() { + return url; + } + + public JedisPoolConfig getConfig() { + return config; + } +} diff --git a/dubbo-remoting/pom.xml b/dubbo-remoting/pom.xml index 19970ad..1b0524e 100644 --- a/dubbo-remoting/pom.xml +++ b/dubbo-remoting/pom.xml @@ -40,5 +40,6 @@ <module>dubbo-remoting-zookeeper</module> <module>dubbo-remoting-netty4</module> <module>dubbo-remoting-etcd3</module> + <module>dubbo-remoting-redis</module> </modules> </project> \ No newline at end of file