This is an automated email from the ASF dual-hosted git repository.
tswstarplanet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 392f159 [Dubbo-6536]Add a remoting redis to adapt mono, sentinel and
cluster redis; refactor redis registry to use the new module (#6614)
392f159 is described below
commit 392f15952f46340130ba51b08a02536823a5880a
Author: tswstarplanet <[email protected]>
AuthorDate: Sat Aug 22 01:13:39 2020 +0800
[Dubbo-6536]Add a remoting redis to adapt mono, sentinel and cluster redis;
refactor redis registry to use the new module (#6614)
add remoting-redis module, refactor RedisRegistry
---
.../src/main/java/org/apache/dubbo/common/URL.java | 4 -
.../java/org/apache/dubbo/common/URLBuilder.java | 3 -
.../dubbo/common/constants/CommonConstants.java | 9 +
dubbo-dependencies-bom/pom.xml | 2 +-
dubbo-metadata/dubbo-metadata-report-redis/pom.xml | 4 -
dubbo-registry/dubbo-registry-redis/pom.xml | 5 +
.../apache/dubbo/registry/redis/RedisRegistry.java | 327 +++++++--------------
.../dubbo/registry/redis/RedisRegistryTest.java | 9 +-
.../dubbo-remoting-redis}/pom.xml | 33 ++-
.../apache/dubbo/remoting/redis/RedisClient.java | 46 +++
.../remoting/redis/jedis/ClusterRedisClient.java | 134 +++++++++
.../remoting/redis/jedis/MonoRedisClient.java | 118 ++++++++
.../remoting/redis/jedis/SentinelRedisClient.java | 121 ++++++++
.../redis/support/AbstractRedisClient.java | 94 ++++++
dubbo-remoting/pom.xml | 1 +
15 files changed, 651 insertions(+), 259 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index 6ab3a89..c38ad58 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -199,10 +199,6 @@ class URL implements Serializable {
String path,
Map<String, String> parameters,
Map<String, Map<String, String>> methodParameters) {
- if (StringUtils.isEmpty(username)
- && StringUtils.isNotEmpty(password)) {
- throw new IllegalArgumentException("Invalid url, password without
username!");
- }
this.protocol = protocol;
this.username = username;
this.password = password;
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
index 20c6c60..eeef637 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
@@ -125,9 +125,6 @@ public final class URLBuilder {
}
public URL build() {
- if (StringUtils.isEmpty(username) && StringUtils.isNotEmpty(password))
{
- throw new IllegalArgumentException("Invalid url, password without
username!");
- }
port = port < 0 ? 0 : port;
// trim the leading "/"
int firstNonSlash = 0;
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index e2cc73d..1266d92 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -74,6 +74,8 @@ public interface CommonConstants {
Pattern EQUAL_SPLIT_PATTERN = Pattern.compile("\\s*[=]+\\s*");
+ Pattern COLON_SPLIT_PATTERN = Pattern.compile("\\s*[:]+\\s*");
+
String DEFAULT_PROXY = "javassist";
String DEFAULT_DIRECTORY = "dubbo";
@@ -349,4 +351,11 @@ public interface CommonConstants {
*/
String DEFAULT_SERVICE_NAME_MAPPING_PROPERTIES_PATH =
"META-INF/dubbo/service-name-mapping.properties";
+ String REDIS_CLIENT_KEY = "redis-client";
+
+ String MONO_REDIS = "mono";
+
+ String SENTINEL_REDIS = "sentinel";
+
+ String CLUSTER_REDIS = "cluster";
}
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 7a63e3c..ddae94e 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -101,7 +101,7 @@
<zookeeper_version>3.4.13</zookeeper_version>
<curator_version>4.0.1</curator_version>
<curator_test_version>2.12.0</curator_test_version>
- <jedis_version>2.9.0</jedis_version>
+ <jedis_version>3.3.0</jedis_version>
<consul_version>1.4.2</consul_version>
<consul_process_version>2.0.0</consul_process_version>
<consul_client_version>1.3.7</consul_client_version>
diff --git a/dubbo-metadata/dubbo-metadata-report-redis/pom.xml
b/dubbo-metadata/dubbo-metadata-report-redis/pom.xml
index f42424b..62893ba 100644
--- a/dubbo-metadata/dubbo-metadata-report-redis/pom.xml
+++ b/dubbo-metadata/dubbo-metadata-report-redis/pom.xml
@@ -24,9 +24,6 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-metadata-report-redis</artifactId>
- <properties>
- <jedis.version>2.9.0</jedis.version>
- </properties>
<dependencies>
<dependency>
@@ -37,7 +34,6 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
- <version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
diff --git a/dubbo-registry/dubbo-registry-redis/pom.xml
b/dubbo-registry/dubbo-registry-redis/pom.xml
index 0971cb8..63725af 100644
--- a/dubbo-registry/dubbo-registry-redis/pom.xml
+++ b/dubbo-registry/dubbo-registry-redis/pom.xml
@@ -36,6 +36,11 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-remoting-redis</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
diff --git
a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
index 0938a39..dcf8752 100644
---
a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
+++
b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
@@ -18,25 +18,21 @@ package org.apache.dubbo.registry.redis;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
-import org.apache.dubbo.common.constants.RemotingConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.jedis.ClusterRedisClient;
+import org.apache.dubbo.remoting.redis.jedis.MonoRedisClient;
+import org.apache.dubbo.remoting.redis.jedis.SentinelRedisClient;
import org.apache.dubbo.rpc.RpcException;
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
-import redis.clients.jedis.JedisSentinelPool;
-import redis.clients.util.Pool;
import java.util.ArrayList;
import java.util.Arrays;
@@ -59,10 +55,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
-import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_REDIS;
+import static org.apache.dubbo.common.constants.CommonConstants.MONO_REDIS;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.REDIS_CLIENT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.SENTINEL_REDIS;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static
org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
@@ -93,7 +91,7 @@ public class RedisRegistry extends FailbackRegistry {
private final String root;
- private final Map<String, Pool<Jedis>> jedisPools = new
ConcurrentHashMap<>();
+ private RedisClient redisClient;
private final ConcurrentMap<String, Notifier> notifiers = new
ConcurrentHashMap<>();
@@ -107,78 +105,18 @@ public class RedisRegistry extends FailbackRegistry {
public RedisRegistry(URL url) {
super(url);
+ String type = url.getParameter(REDIS_CLIENT_KEY, MONO_REDIS);
+ if (SENTINEL_REDIS.equals(type)) {
+ redisClient = new SentinelRedisClient(url);
+ } else if (CLUSTER_REDIS.equals(type)) {
+ redisClient = new ClusterRedisClient(url);
+ } else {
+ redisClient = new MonoRedisClient(url);
+ }
+
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
- GenericObjectPoolConfig config = new GenericObjectPoolConfig();
- config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
- config.setTestOnReturn(url.getParameter("test.on.return", false));
- config.setTestWhileIdle(url.getParameter("test.while.idle", false));
- if (url.getParameter("max.idle", 0) > 0) {
- config.setMaxIdle(url.getParameter("max.idle", 0));
- }
- if (url.getParameter("min.idle", 0) > 0) {
- config.setMinIdle(url.getParameter("min.idle", 0));
- }
- if (url.getParameter("max.active", 0) > 0) {
- config.setMaxTotal(url.getParameter("max.active", 0));
- }
- if (url.getParameter("max.total", 0) > 0) {
- config.setMaxTotal(url.getParameter("max.total", 0));
- }
- if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
- config.setMaxWaitMillis(url.getParameter("max.wait",
url.getParameter("timeout", 0)));
- }
- if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
-
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run",
0));
- }
- if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
-
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis",
0));
- }
- if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
-
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis",
0));
- }
-
- String cluster = url.getParameter("cluster", "failover");
- if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
- throw new IllegalArgumentException("Unsupported redis cluster: " +
cluster + ". The redis cluster only supported failover or replicate.");
- }
- replicate = "replicate".equals(cluster);
-
- List<String> addresses = new ArrayList<>();
- addresses.add(url.getAddress());
- String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new
String[0]);
- if (ArrayUtils.isNotEmpty(backups)) {
- addresses.addAll(Arrays.asList(backups));
- }
- //获得Redis主节点名称
- String masterName = url.getParameter(REDIS_MASTER_NAME_KEY);
- if (StringUtils.isEmpty(masterName)) {
- //单机版redis
- for (String address : addresses) {
- int i = address.indexOf(':');
- String host;
- int port;
- if (i > 0) {
- host = address.substring(0, i);
- port = Integer.parseInt(address.substring(i + 1));
- } else {
- host = address;
- port = DEFAULT_REDIS_PORT;
- }
- this.jedisPools.put(address, new JedisPool(config, host, port,
- url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT),
StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
- url.getParameter("db.index", 0)));
- }
- } else {
- //哨兵版redis
- Set<String> sentinelSet = new HashSet<>(addresses);
- int index = url.getParameter("db.index", 0);
- int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
- String password = StringUtils.isEmpty(url.getPassword()) ? null :
url.getPassword();
- JedisSentinelPool pool = new JedisSentinelPool(masterName,
sentinelSet, config, timeout, password, index);
- this.jedisPools.put(masterName, pool);
- }
this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY,
DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
@@ -201,37 +139,24 @@ public class RedisRegistry extends FailbackRegistry {
}
private void deferExpired() {
- for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
- Pool<Jedis> jedisPool = entry.getValue();
- try {
- try (Jedis jedis = jedisPool.getResource()) {
- for (URL url : new HashSet<>(getRegistered())) {
- if (url.getParameter(DYNAMIC_KEY, true)) {
- String key = toCategoryPath(url);
- if (jedis.hset(key, url.toFullString(),
String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
- jedis.publish(key, REGISTER);
- }
- }
- }
- if (admin) {
- clean(jedis);
- }
- if (!replicate) {
- break;// If the server side has synchronized data,
just write a single machine
- }
+ for (URL url : new HashSet<>(getRegistered())) {
+ if (url.getParameter(DYNAMIC_KEY, true)) {
+ String key = toCategoryPath(url);
+ if (redisClient.hset(key, url.toFullString(),
String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
+ redisClient.publish(key, REGISTER);
}
- } catch (Throwable t) {
- logger.warn("Failed to write provider heartbeat to redis
registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
+ if (admin) {
+ clean();
+ }
}
- // The monitoring center is responsible for deleting outdated dirty data
- private void clean(Jedis jedis) {
- Set<String> keys = jedis.keys(root + ANY_VALUE);
+ private void clean() {
+ Set<String> keys = redisClient.scan(root + ANY_VALUE);
if (CollectionUtils.isNotEmpty(keys)) {
for (String key : keys) {
- Map<String, String> values = jedis.hgetAll(key);
+ Map<String, String> values = redisClient.hgetAll(key);
if (CollectionUtils.isNotEmptyMap(values)) {
boolean delete = false;
long now = System.currentTimeMillis();
@@ -240,7 +165,7 @@ public class RedisRegistry extends FailbackRegistry {
if (url.getParameter(DYNAMIC_KEY, true)) {
long expire = Long.parseLong(entry.getValue());
if (expire < now) {
- jedis.hdel(key, entry.getKey());
+ redisClient.hdel(key, entry.getKey());
delete = true;
if (logger.isWarnEnabled()) {
logger.warn("Delete expired key: " + key +
" -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " +
new Date(now));
@@ -249,7 +174,7 @@ public class RedisRegistry extends FailbackRegistry {
}
}
if (delete) {
- jedis.publish(key, UNREGISTER);
+ redisClient.publish(key, UNREGISTER);
}
}
}
@@ -258,15 +183,7 @@ public class RedisRegistry extends FailbackRegistry {
@Override
public boolean isAvailable() {
- for (Pool<Jedis> jedisPool : jedisPools.values()) {
- try (Jedis jedis = jedisPool.getResource()) {
- if (jedis.isConnected()) {
- return true; // At least one single machine is available.
- }
- } catch (Throwable t) {
- }
- }
- return false;
+ return redisClient.isConnected();
}
@Override
@@ -284,13 +201,10 @@ public class RedisRegistry extends FailbackRegistry {
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
- for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
- Pool<Jedis> jedisPool = entry.getValue();
- try {
- jedisPool.destroy();
- } catch (Throwable t) {
- logger.warn("Failed to destroy the redis registry client.
registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
- }
+ try {
+ redisClient.destroy();
+ } catch (Throwable t) {
+ logger.warn("Failed to destroy the redis registry client.
registry: " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
}
ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
}
@@ -302,21 +216,13 @@ public class RedisRegistry extends FailbackRegistry {
String expire = String.valueOf(System.currentTimeMillis() +
expirePeriod);
boolean success = false;
RpcException exception = null;
- for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
- Pool<Jedis> jedisPool = entry.getValue();
- try {
- try (Jedis jedis = jedisPool.getResource()) {
- jedis.hset(key, value, expire);
- jedis.publish(key, REGISTER);
- success = true;
- if (!replicate) {
- break; // If the server side has synchronized data,
just write a single machine
- }
- }
- } catch (Throwable t) {
- exception = new RpcException("Failed to register service to
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause:
" + t.getMessage(), t);
- }
+ try {
+ redisClient.hset(key, value, expire);
+ redisClient.publish(key, REGISTER);
+ } catch (Throwable t) {
+ exception = new RpcException("Failed to register service to redis
registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " +
t.getMessage(), t);
}
+
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
@@ -332,21 +238,14 @@ public class RedisRegistry extends FailbackRegistry {
String value = url.toFullString();
RpcException exception = null;
boolean success = false;
- for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
- Pool<Jedis> jedisPool = entry.getValue();
- try {
- try (Jedis jedis = jedisPool.getResource()) {
- jedis.hdel(key, value);
- jedis.publish(key, UNREGISTER);
- success = true;
- if (!replicate) {
- break; // If the server side has synchronized data,
just write a single machine
- }
- }
- } catch (Throwable t) {
- exception = new RpcException("Failed to unregister service to
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause:
" + t.getMessage(), t);
- }
+ try {
+ redisClient.hdel(key, value);
+ redisClient.publish(key, UNREGISTER);
+ success = true;
+ } catch (Throwable t) {
+ exception = new RpcException("Failed to unregister service to
redis registry. registry: " + url.getAddress() + ", service: " + url + ",
cause: " + t.getMessage(), t);
}
+
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
@@ -370,33 +269,27 @@ public class RedisRegistry extends FailbackRegistry {
}
boolean success = false;
RpcException exception = null;
- for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
- Pool<Jedis> jedisPool = entry.getValue();
- try {
- try (Jedis jedis = jedisPool.getResource()) {
- if (service.endsWith(ANY_VALUE)) {
- admin = true;
- Set<String> keys = jedis.keys(service);
- if (CollectionUtils.isNotEmpty(keys)) {
- Map<String, Set<String>> serviceKeys = new
HashMap<>();
- for (String key : keys) {
- String serviceKey = toServicePath(key);
- Set<String> sk =
serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
- sk.add(key);
- }
- for (Set<String> sk : serviceKeys.values()) {
- doNotify(jedis, sk, url,
Collections.singletonList(listener));
- }
- }
- } else {
- doNotify(jedis, jedis.keys(service + PATH_SEPARATOR +
ANY_VALUE), url, Collections.singletonList(listener));
+ try {
+ if (service.endsWith(ANY_VALUE)) {
+ admin = true;
+ Set<String> keys = redisClient.scan(service);
+ if (CollectionUtils.isNotEmpty(keys)) {
+ Map<String, Set<String>> serviceKeys = new HashMap<>();
+ for (String key : keys) {
+ String serviceKey = toServicePath(key);
+ Set<String> sk =
serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
+ sk.add(key);
+ }
+ for (Set<String> sk : serviceKeys.values()) {
+ doNotify(sk, url, Collections.singletonList(listener));
}
- success = true;
- break; // Just read one server's data
}
- } catch (Throwable t) { // Try the next server
- exception = new RpcException("Failed to subscribe service from
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause:
" + t.getMessage(), t);
+ } else {
+ doNotify(redisClient.scan(service + PATH_SEPARATOR +
ANY_VALUE), url, Collections.singletonList(listener));
}
+ success = true;
+ } catch (Throwable t) {
+ exception = new RpcException("Failed to subscribe service from
redis registry. registry: " + url.getAddress() + ", service: " + url + ",
cause: " + t.getMessage(), t);
}
if (exception != null) {
if (success) {
@@ -411,13 +304,13 @@ public class RedisRegistry extends FailbackRegistry {
public void doUnsubscribe(URL url, NotifyListener listener) {
}
- private void doNotify(Jedis jedis, String key) {
+ private void doNotify(String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new
HashMap<>(getSubscribed()).entrySet()) {
- doNotify(jedis, Collections.singletonList(key), entry.getKey(),
new HashSet<>(entry.getValue()));
+ doNotify(Collections.singletonList(key), entry.getKey(), new
HashSet<>(entry.getValue()));
}
}
- private void doNotify(Jedis jedis, Collection<String> keys, URL url,
Collection<NotifyListener> listeners) {
+ private void doNotify(Collection<String> keys, URL url,
Collection<NotifyListener> listeners) {
if (keys == null || keys.isEmpty()
|| listeners == null || listeners.isEmpty()) {
return;
@@ -438,7 +331,7 @@ public class RedisRegistry extends FailbackRegistry {
continue;
}
List<URL> urls = new ArrayList<>();
- Map<String, String> values = jedis.hgetAll(key);
+ Map<String, String> values = redisClient.hgetAll(key);
if (CollectionUtils.isNotEmptyMap(values)) {
for (Map.Entry<String, String> entry : values.entrySet()) {
URL u = URL.valueOf(entry.getKey());
@@ -500,12 +393,7 @@ public class RedisRegistry extends FailbackRegistry {
}
private class NotifySub extends JedisPubSub {
-
- private final Pool<Jedis> jedisPool;
-
- public NotifySub(Pool<Jedis> jedisPool) {
- this.jedisPool = jedisPool;
- }
+ public NotifySub() {}
@Override
public void onMessage(String key, String msg) {
@@ -515,12 +403,7 @@ public class RedisRegistry extends FailbackRegistry {
if (msg.equals(REGISTER)
|| msg.equals(UNREGISTER)) {
try {
- Jedis jedis = jedisPool.getResource();
- try {
- doNotify(jedis, key);
- } finally {
- jedis.close();
- }
+ doNotify(key);
} catch (Throwable t) { // TODO Notification failure does not
restore mechanism guarantee
logger.error(t.getMessage(), t);
}
@@ -555,7 +438,7 @@ public class RedisRegistry extends FailbackRegistry {
private final String service;
private final AtomicInteger connectSkip = new AtomicInteger();
private final AtomicInteger connectSkipped = new AtomicInteger();
- private volatile Jedis jedis;
+
private volatile boolean first = true;
private volatile boolean running = true;
private volatile int connectRandom;
@@ -595,46 +478,34 @@ public class RedisRegistry extends FailbackRegistry {
try {
if (!isSkip()) {
try {
- for (Map.Entry<String, Pool<Jedis>> entry :
jedisPools.entrySet()) {
- Pool<Jedis> jedisPool = entry.getValue();
- try {
- if (jedisPool.isClosed()) {
- continue;
- }
- jedis = jedisPool.getResource();
- if (!jedis.isConnected()) {
- continue;
- }
- try {
- if (service.endsWith(ANY_VALUE)) {
- if (first) {
- first = false;
- Set<String> keys =
jedis.keys(service);
- if
(CollectionUtils.isNotEmpty(keys)) {
- for (String s : keys) {
- doNotify(jedis, s);
- }
- }
- resetSkip();
- }
- jedis.psubscribe(new
NotifySub(jedisPool), service); // blocking
- } else {
- if (first) {
- first = false;
- doNotify(jedis, service);
- resetSkip();
+ if (!redisClient.isConnected()) {
+ continue;
+ }
+ try {
+ if (service.endsWith(ANY_VALUE)) {
+ if (first) {
+ first = false;
+ Set<String> keys =
redisClient.scan(service);
+ if (CollectionUtils.isNotEmpty(keys)) {
+ for (String s : keys) {
+ doNotify(s);
}
- jedis.psubscribe(new
NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking
}
- break;
- } finally {
- jedis.close();
+ resetSkip();
+ }
+ redisClient.psubscribe(new NotifySub(),
service);
+ } else {
+ if (first) {
+ first = false;
+ doNotify(service);
+ resetSkip();
}
- } catch (Throwable t) { // Retry another server
- logger.warn("Failed to subscribe service
from redis registry. registry: " + entry.getKey() + ", cause: " +
t.getMessage(), t);
- // If you only have a single redis, you
need to take a rest to avoid overtaking a lot of CPU resources
- sleep(reconnectPeriod);
+ redisClient.psubscribe(new NotifySub(),
service + PATH_SEPARATOR + ANY_VALUE); // blocking
}
+ } catch (Throwable t) { // Retry another server
+ logger.warn("Failed to subscribe service from
redis registry. registry: " + getUrl().getAddress() + ", cause: " +
t.getMessage(), t);
+ // If you only have a single redis, you need
to take a rest to avoid overtaking a lot of CPU resources
+ sleep(reconnectPeriod);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
@@ -650,7 +521,7 @@ public class RedisRegistry extends FailbackRegistry {
public void shutdown() {
try {
running = false;
- jedis.disconnect();
+ redisClient.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
diff --git
a/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
b/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
index 755e452..c41fe24 100644
---
a/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
+++
b/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.embedded.RedisServer;
import redis.embedded.RedisServerBuilder;
@@ -36,6 +37,7 @@ import java.util.Set;
import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class RedisRegistryTest {
@@ -80,7 +82,7 @@ public class RedisRegistryTest {
@Test
public void testAnyHost() {
- Assertions.assertThrows(IllegalStateException.class, () -> {
+ assertThrows(IllegalStateException.class, () -> {
URL errorUrl = URL.valueOf("multicast://0.0.0.0/");
new RedisRegistryFactory().createRegistry(errorUrl);
});
@@ -112,7 +114,7 @@ public class RedisRegistryTest {
assertThat(redisRegistry.isAvailable(), is(true));
redisRegistry.destroy();
- assertThat(redisRegistry.isAvailable(), is(false));
+ assertThrows(JedisConnectionException.class, () ->
redisRegistry.isAvailable());
}
@Test
@@ -120,7 +122,8 @@ public class RedisRegistryTest {
URL url =
URL.valueOf("redis://redisOne:8880").addParameter(BACKUP_KEY, "redisTwo:8881");
Registry registry = new RedisRegistryFactory().createRegistry(url);
- assertThat(registry.isAvailable(), is(false));
+ Registry finalRegistry = registry;
+ assertThrows(JedisConnectionException.class, () ->
finalRegistry.isAvailable());
url =
URL.valueOf(this.registryUrl.toFullString()).addParameter(BACKUP_KEY,
"redisTwo:8881");
registry = new RedisRegistryFactory().createRegistry(url);
diff --git a/dubbo-registry/dubbo-registry-redis/pom.xml
b/dubbo-remoting/dubbo-remoting-redis/pom.xml
similarity index 72%
copy from dubbo-registry/dubbo-registry-redis/pom.xml
copy to dubbo-remoting/dubbo-remoting-redis/pom.xml
index 0971cb8..b30f6a7 100644
--- a/dubbo-registry/dubbo-registry-redis/pom.xml
+++ b/dubbo-remoting/dubbo-remoting-redis/pom.xml
@@ -14,40 +14,41 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
+ <artifactId>dubbo-remoting</artifactId>
<groupId>org.apache.dubbo</groupId>
- <artifactId>dubbo-registry</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>dubbo-registry-redis</artifactId>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dubbo-remoting-redis</artifactId>
+
<packaging>jar</packaging>
+
<name>${project.artifactId}</name>
- <description>The redis registry module of dubbo project</description>
+ <description>The redis remoting module of dubbo project</description>
+
<properties>
<skip_maven_deploy>false</skip_maven_deploy>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
+
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
- <artifactId>dubbo-registry-api</artifactId>
+ <artifactId>dubbo-remoting-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- </dependency>
- <dependency>
- <groupId>com.github.kstyrc</groupId>
- <artifactId>embedded-redis</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ <version>${project.parent.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <scope>test</scope>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
new file mode 100644
index 0000000..7b2d7b8
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis;
+
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface RedisClient {
+ Long hset(String key, String field, String value);
+
+ Long publish(String channel, String message);
+
+// void clean(String pattern);
+
+ boolean isConnected();
+
+ void destroy();
+
+ Long hdel(final String key, final String... fields);
+
+ Set<String> scan(String pattern);
+
+ Map<String, String> hgetAll(String key);
+
+ void psubscribe(final JedisPubSub jedisPubSub, final String... patterns);
+
+ void disconnect();
+
+ void close();
+}
diff --git
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
new file mode 100644
index 0000000..b4b104e
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.dubbo.common.constants.CommonConstants.COLON_SPLIT_PATTERN;
+import static
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+
+public class ClusterRedisClient extends AbstractRedisClient implements
RedisClient {
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterRedisClient.class);
+
+ private static final int DEFAULT_TIMEOUT = 2000;
+
+ private static final int DEFAULT_SO_TIMEOUT = 2000;
+
+ private static final int DEFAULT_MAX_ATTEMPTS = 5;
+
+ private JedisCluster jedisCluster;
+
+ public ClusterRedisClient(URL url) {
+ super(url);
+ Set<HostAndPort> nodes = getNodes(url);
+ jedisCluster = new JedisCluster(nodes,
url.getParameter("connection.timeout", DEFAULT_TIMEOUT),
+ url.getParameter("so.timeout", DEFAULT_SO_TIMEOUT),
url.getParameter("max.attempts", DEFAULT_MAX_ATTEMPTS),
+ url.getPassword(), getConfig());
+ }
+
+ @Override
+ public Long hset(String key, String field, String value) {
+ return jedisCluster.hset(key, field, value);
+ }
+
+ @Override
+ public Long publish(String channel, String message) {
+ return jedisCluster.publish(channel, message);
+ }
+
+ @Override
+ public boolean isConnected() {
+ Map<String, JedisPool> poolMap = jedisCluster.getClusterNodes();
+ for (JedisPool jedisPool : poolMap.values()) {
+ Jedis jedis = jedisPool.getResource();
+ if (jedis.isConnected()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void destroy() {
+ jedisCluster.close();
+ }
+
+ @Override
+ public Long hdel(String key, String... fields) {
+ return jedisCluster.hdel(key, fields);
+ }
+
+ @Override
+ public Set<String> scan(String pattern) {
+ Map<String, JedisPool> nodes = jedisCluster.getClusterNodes();
+ Set<String> result = new HashSet<>();
+ for (JedisPool jedisPool : nodes.values()) {
+ Jedis jedis = jedisPool.getResource();
+ result.addAll(scan(jedis, pattern));
+ jedis.close();
+ }
+ return result;
+ }
+
+ @Override
+ public Map<String, String> hgetAll(String key) {
+ return jedisCluster.hgetAll(key);
+ }
+
+ @Override
+ public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+ jedisCluster.psubscribe(jedisPubSub, patterns);
+ }
+
+ @Override
+ public void disconnect() {
+ jedisCluster.close();
+ }
+
+ @Override
+ public void close() {
+ jedisCluster.close();
+ }
+
+ private Set<HostAndPort> getNodes(URL url) {
+ Set<HostAndPort> hostAndPorts = new HashSet<>();
+ hostAndPorts.add(new HostAndPort(url.getHost(), url.getPort()));
+ String backupAddresses = url.getBackupAddress(6379);
+ String[] nodes = StringUtils.isEmpty(backupAddresses) ? new String[0]
: COMMA_SPLIT_PATTERN.split(backupAddresses);
+ if (nodes.length > 0) {
+ for (String node : nodes) {
+ String[] hostAndPort = COLON_SPLIT_PATTERN.split(node);
+ hostAndPorts.add(new HostAndPort(hostAndPort[0],
Integer.valueOf(hostAndPort[1])));
+ }
+ }
+ return hostAndPorts;
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
new file mode 100644
index 0000000..d69c565
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+public class MonoRedisClient extends AbstractRedisClient implements
RedisClient {
+ private static final Logger logger =
LoggerFactory.getLogger(MonoRedisClient.class);
+
+ private static final String START_CURSOR = "0";
+
+ private JedisPool jedisPool;
+
+ public MonoRedisClient(URL url) {
+ super(url);
+ jedisPool = new JedisPool(getConfig(), url.getHost(), url.getPort(),
+ url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT),
url.getPassword());
+ }
+
+ @Override
+ public Long hset(String key, String field, String value) {
+ Jedis jedis = jedisPool.getResource();
+ Long result = jedis.hset(key, field, value);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Long publish(String channel, String message) {
+ Jedis jedis = jedisPool.getResource();
+ Long result = jedis.publish(channel, message);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public boolean isConnected() {
+ Jedis jedis = jedisPool.getResource();
+ boolean connected = jedis.isConnected();
+ jedis.close();
+ return connected;
+ }
+
+ @Override
+ public void destroy() {
+ jedisPool.close();
+ }
+
+ @Override
+ public Long hdel(String key, String... fields) {
+ Jedis jedis = jedisPool.getResource();
+ Long result = jedis.hdel(key, fields);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Set<String> scan(String pattern) {
+ Jedis jedis = jedisPool.getResource();
+ Set<String> result = super.scan(jedis, pattern);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Map<String, String> hgetAll(String key) {
+ Jedis jedis = jedisPool.getResource();
+ Map<String, String> result = jedis.hgetAll(key);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+ Jedis jedis = jedisPool.getResource();
+ jedis.psubscribe(jedisPubSub, patterns);
+ jedis.close();
+ }
+
+ @Override
+ public void disconnect() {
+ jedisPool.close();
+ }
+
+ @Override
+ public void close() {
+ jedisPool.close();
+ }
+
+
+}
diff --git
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
new file mode 100644
index 0000000..ba3c890
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.RemotingConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class SentinelRedisClient extends AbstractRedisClient implements
RedisClient {
+ private static final Logger logger =
LoggerFactory.getLogger(SentinelRedisClient.class);
+
+ private JedisSentinelPool sentinelPool;
+
+ public SentinelRedisClient(URL url) {
+ super(url);
+ String masterName = url.getParameter("master.name", "Sentinel-master");
+ String address = (new
StringBuilder()).append(url.getAddress()).toString();
+ String[] backupAddresses =
url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
+ if (backupAddresses.length == 0) {
+ throw new IllegalStateException("Sentinel addresses can not be
empty");
+ }
+ Set<String> sentinels = new HashSet<>(Arrays.asList(backupAddresses));
+ sentinels.add(address);
+ sentinelPool = new JedisSentinelPool(masterName, sentinels,
getConfig(), url.getPassword());
+ }
+
+ @Override
+ public Long hset(String key, String field, String value) {
+ Jedis jedis = sentinelPool.getResource();
+ Long result = jedis.hset(key, field, value);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Long publish(String channel, String message) {
+ Jedis jedis = sentinelPool.getResource();
+ Long result = jedis.publish(channel, message);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public boolean isConnected() {
+ Jedis jedis = sentinelPool.getResource();
+ boolean result = jedis.isConnected();
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public void destroy() {
+ sentinelPool.close();
+ }
+
+ @Override
+ public Long hdel(String key, String... fields) {
+ Jedis jedis = sentinelPool.getResource();
+ Long result = jedis.hdel(key, fields);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Set<String> scan(String pattern) {
+ Jedis jedis = sentinelPool.getResource();
+ Set<String> result = scan(jedis, pattern);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public Map<String, String> hgetAll(String key) {
+ Jedis jedis = sentinelPool.getResource();
+ Map<String, String> result = jedis.hgetAll(key);
+ jedis.close();
+ return result;
+ }
+
+ @Override
+ public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+ Jedis jedis = sentinelPool.getResource();
+ jedis.psubscribe(jedisPubSub, patterns);
+ jedis.close();
+ }
+
+ @Override
+ public void disconnect() {
+ sentinelPool.close();
+ }
+
+ @Override
+ public void close() {
+ sentinelPool.close();
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
new file mode 100644
index 0000000..e09fd1c
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractRedisClient implements RedisClient {
+ private URL url;
+
+ private JedisPoolConfig config;
+
+ public AbstractRedisClient(URL url) {
+ this.url = url;
+ config = new JedisPoolConfig();
+ config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
+ config.setTestOnReturn(url.getParameter("test.on.return", false));
+ config.setTestWhileIdle(url.getParameter("test.while.idle", false));
+ if (url.getParameter("max.idle", 0) > 0) {
+ config.setMaxIdle(url.getParameter("max.idle", 0));
+ }
+ if (url.getParameter("min.idle", 0) > 0) {
+ config.setMinIdle(url.getParameter("min.idle", 0));
+ }
+ if (url.getParameter("max.active", 0) > 0) {
+ config.setMaxTotal(url.getParameter("max.active", 0));
+ }
+ if (url.getParameter("max.total", 0) > 0) {
+ config.setMaxTotal(url.getParameter("max.total", 0));
+ }
+ if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
+ config.setMaxWaitMillis(url.getParameter("max.wait",
url.getParameter("timeout", 0)));
+ }
+ if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
+
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run",
0));
+ }
+ if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
+
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis",
0));
+ }
+ if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
+
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis",
0));
+ }
+ }
+
+ protected Set<String> scan(Jedis jedis, String pattern) {
+ Set<String> result = new HashSet<>();
+ String cursor = ScanParams.SCAN_POINTER_START;
+ ScanParams params = new ScanParams();
+ params.match(pattern);
+ while (true) {
+ ScanResult<String> scanResult = jedis.scan(cursor, params);
+ List<String> list = scanResult.getResult();
+ if (CollectionUtils.isNotEmpty(list)) {
+ result.addAll(list);
+ }
+ if (ScanParams.SCAN_POINTER_START.equals(scanResult.getCursor())) {
+ break;
+ }
+ cursor = scanResult.getCursor();
+ }
+ return result;
+ }
+
+ public URL getUrl() {
+ return url;
+ }
+
+ public JedisPoolConfig getConfig() {
+ return config;
+ }
+}
diff --git a/dubbo-remoting/pom.xml b/dubbo-remoting/pom.xml
index 19970ad..1b0524e 100644
--- a/dubbo-remoting/pom.xml
+++ b/dubbo-remoting/pom.xml
@@ -40,5 +40,6 @@
<module>dubbo-remoting-zookeeper</module>
<module>dubbo-remoting-netty4</module>
<module>dubbo-remoting-etcd3</module>
+ <module>dubbo-remoting-redis</module>
</modules>
</project>
\ No newline at end of file