This is an automated email from the ASF dual-hosted git repository.

tswstarplanet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 392f159  [Dubbo-6536]Add a remoting redis to adapt mono, sentinel and 
cluster redis; refactor redis registry to use the new module (#6614)
392f159 is described below

commit 392f15952f46340130ba51b08a02536823a5880a
Author: tswstarplanet <tswstarpla...@apache.org>
AuthorDate: Sat Aug 22 01:13:39 2020 +0800

    [Dubbo-6536]Add a remoting redis to adapt mono, sentinel and cluster redis; 
refactor redis registry to use the new module (#6614)
    
    add remoting-redis module, refactor RedisRegistry
---
 .../src/main/java/org/apache/dubbo/common/URL.java |   4 -
 .../java/org/apache/dubbo/common/URLBuilder.java   |   3 -
 .../dubbo/common/constants/CommonConstants.java    |   9 +
 dubbo-dependencies-bom/pom.xml                     |   2 +-
 dubbo-metadata/dubbo-metadata-report-redis/pom.xml |   4 -
 dubbo-registry/dubbo-registry-redis/pom.xml        |   5 +
 .../apache/dubbo/registry/redis/RedisRegistry.java | 327 +++++++--------------
 .../dubbo/registry/redis/RedisRegistryTest.java    |   9 +-
 .../dubbo-remoting-redis}/pom.xml                  |  33 ++-
 .../apache/dubbo/remoting/redis/RedisClient.java   |  46 +++
 .../remoting/redis/jedis/ClusterRedisClient.java   | 134 +++++++++
 .../remoting/redis/jedis/MonoRedisClient.java      | 118 ++++++++
 .../remoting/redis/jedis/SentinelRedisClient.java  | 121 ++++++++
 .../redis/support/AbstractRedisClient.java         |  94 ++++++
 dubbo-remoting/pom.xml                             |   1 +
 15 files changed, 651 insertions(+), 259 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index 6ab3a89..c38ad58 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -199,10 +199,6 @@ class URL implements Serializable {
                String path,
                Map<String, String> parameters,
                Map<String, Map<String, String>> methodParameters) {
-        if (StringUtils.isEmpty(username)
-                && StringUtils.isNotEmpty(password)) {
-            throw new IllegalArgumentException("Invalid url, password without 
username!");
-        }
         this.protocol = protocol;
         this.username = username;
         this.password = password;
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
index 20c6c60..eeef637 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
@@ -125,9 +125,6 @@ public final class URLBuilder {
     }
 
     public URL build() {
-        if (StringUtils.isEmpty(username) && StringUtils.isNotEmpty(password)) 
{
-            throw new IllegalArgumentException("Invalid url, password without 
username!");
-        }
         port = port < 0 ? 0 : port;
         // trim the leading "/"
         int firstNonSlash = 0;
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index e2cc73d..1266d92 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -74,6 +74,8 @@ public interface CommonConstants {
 
     Pattern EQUAL_SPLIT_PATTERN = Pattern.compile("\\s*[=]+\\s*");
 
+    Pattern COLON_SPLIT_PATTERN = Pattern.compile("\\s*[:]+\\s*");
+
     String DEFAULT_PROXY = "javassist";
 
     String DEFAULT_DIRECTORY = "dubbo";
@@ -349,4 +351,11 @@ public interface CommonConstants {
      */
     String DEFAULT_SERVICE_NAME_MAPPING_PROPERTIES_PATH = 
"META-INF/dubbo/service-name-mapping.properties";
 
+    String REDIS_CLIENT_KEY = "redis-client";
+
+    String MONO_REDIS = "mono";
+
+    String SENTINEL_REDIS = "sentinel";
+
+    String CLUSTER_REDIS = "cluster";
 }
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 7a63e3c..ddae94e 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -101,7 +101,7 @@
         <zookeeper_version>3.4.13</zookeeper_version>
         <curator_version>4.0.1</curator_version>
         <curator_test_version>2.12.0</curator_test_version>
-        <jedis_version>2.9.0</jedis_version>
+        <jedis_version>3.3.0</jedis_version>
         <consul_version>1.4.2</consul_version>
         <consul_process_version>2.0.0</consul_process_version>
         <consul_client_version>1.3.7</consul_client_version>
diff --git a/dubbo-metadata/dubbo-metadata-report-redis/pom.xml 
b/dubbo-metadata/dubbo-metadata-report-redis/pom.xml
index f42424b..62893ba 100644
--- a/dubbo-metadata/dubbo-metadata-report-redis/pom.xml
+++ b/dubbo-metadata/dubbo-metadata-report-redis/pom.xml
@@ -24,9 +24,6 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>dubbo-metadata-report-redis</artifactId>
-    <properties>
-        <jedis.version>2.9.0</jedis.version>
-    </properties>
 
     <dependencies>
         <dependency>
@@ -37,7 +34,6 @@
         <dependency>
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
-            <version>${jedis.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
diff --git a/dubbo-registry/dubbo-registry-redis/pom.xml 
b/dubbo-registry/dubbo-registry-redis/pom.xml
index 0971cb8..63725af 100644
--- a/dubbo-registry/dubbo-registry-redis/pom.xml
+++ b/dubbo-registry/dubbo-registry-redis/pom.xml
@@ -36,6 +36,11 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-remoting-redis</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
         </dependency>
diff --git 
a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
 
b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
index 0938a39..dcf8752 100644
--- 
a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
@@ -18,25 +18,21 @@ package org.apache.dubbo.registry.redis;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.URLBuilder;
-import org.apache.dubbo.common.constants.RemotingConstants;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.common.utils.UrlUtils;
 import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.jedis.ClusterRedisClient;
+import org.apache.dubbo.remoting.redis.jedis.MonoRedisClient;
+import org.apache.dubbo.remoting.redis.jedis.SentinelRedisClient;
 import org.apache.dubbo.rpc.RpcException;
 
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
 import redis.clients.jedis.JedisPubSub;
-import redis.clients.jedis.JedisSentinelPool;
-import redis.clients.util.Pool;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,10 +55,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
 import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
-import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_REDIS;
+import static org.apache.dubbo.common.constants.CommonConstants.MONO_REDIS;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static 
org.apache.dubbo.common.constants.CommonConstants.REDIS_CLIENT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.SENTINEL_REDIS;
 import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
 import static 
org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
 import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
@@ -93,7 +91,7 @@ public class RedisRegistry extends FailbackRegistry {
 
     private final String root;
 
-    private final Map<String, Pool<Jedis>> jedisPools = new 
ConcurrentHashMap<>();
+    private RedisClient redisClient;
 
     private final ConcurrentMap<String, Notifier> notifiers = new 
ConcurrentHashMap<>();
 
@@ -107,78 +105,18 @@ public class RedisRegistry extends FailbackRegistry {
 
     public RedisRegistry(URL url) {
         super(url);
+        String type = url.getParameter(REDIS_CLIENT_KEY, MONO_REDIS);
+        if (SENTINEL_REDIS.equals(type)) {
+            redisClient = new SentinelRedisClient(url);
+        } else if (CLUSTER_REDIS.equals(type)) {
+            redisClient = new ClusterRedisClient(url);
+        } else {
+            redisClient = new MonoRedisClient(url);
+        }
+
         if (url.isAnyHost()) {
             throw new IllegalStateException("registry address == null");
         }
-        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
-        config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
-        config.setTestOnReturn(url.getParameter("test.on.return", false));
-        config.setTestWhileIdle(url.getParameter("test.while.idle", false));
-        if (url.getParameter("max.idle", 0) > 0) {
-            config.setMaxIdle(url.getParameter("max.idle", 0));
-        }
-        if (url.getParameter("min.idle", 0) > 0) {
-            config.setMinIdle(url.getParameter("min.idle", 0));
-        }
-        if (url.getParameter("max.active", 0) > 0) {
-            config.setMaxTotal(url.getParameter("max.active", 0));
-        }
-        if (url.getParameter("max.total", 0) > 0) {
-            config.setMaxTotal(url.getParameter("max.total", 0));
-        }
-        if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
-            config.setMaxWaitMillis(url.getParameter("max.wait", 
url.getParameter("timeout", 0)));
-        }
-        if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
-            
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 
0));
-        }
-        if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
-            
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis",
 0));
-        }
-        if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
-            
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis",
 0));
-        }
-
-        String cluster = url.getParameter("cluster", "failover");
-        if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
-            throw new IllegalArgumentException("Unsupported redis cluster: " + 
cluster + ". The redis cluster only supported failover or replicate.");
-        }
-        replicate = "replicate".equals(cluster);
-
-        List<String> addresses = new ArrayList<>();
-        addresses.add(url.getAddress());
-        String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new 
String[0]);
-        if (ArrayUtils.isNotEmpty(backups)) {
-            addresses.addAll(Arrays.asList(backups));
-        }
-        //获得Redis主节点名称
-        String masterName = url.getParameter(REDIS_MASTER_NAME_KEY);
-        if (StringUtils.isEmpty(masterName)) {
-            //单机版redis
-            for (String address : addresses) {
-                int i = address.indexOf(':');
-                String host;
-                int port;
-                if (i > 0) {
-                    host = address.substring(0, i);
-                    port = Integer.parseInt(address.substring(i + 1));
-                } else {
-                    host = address;
-                    port = DEFAULT_REDIS_PORT;
-                }
-                this.jedisPools.put(address, new JedisPool(config, host, port,
-                        url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), 
StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
-                        url.getParameter("db.index", 0)));
-            }
-        } else {
-            //哨兵版redis
-            Set<String> sentinelSet = new HashSet<>(addresses);
-            int index = url.getParameter("db.index", 0);
-            int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
-            String password = StringUtils.isEmpty(url.getPassword()) ? null : 
url.getPassword();
-            JedisSentinelPool pool = new JedisSentinelPool(masterName, 
sentinelSet, config, timeout, password, index);
-            this.jedisPools.put(masterName, pool);
-        }
 
         this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, 
DEFAULT_REGISTRY_RECONNECT_PERIOD);
         String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
@@ -201,37 +139,24 @@ public class RedisRegistry extends FailbackRegistry {
     }
 
     private void deferExpired() {
-        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
-            Pool<Jedis> jedisPool = entry.getValue();
-            try {
-                try (Jedis jedis = jedisPool.getResource()) {
-                    for (URL url : new HashSet<>(getRegistered())) {
-                        if (url.getParameter(DYNAMIC_KEY, true)) {
-                            String key = toCategoryPath(url);
-                            if (jedis.hset(key, url.toFullString(), 
String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
-                                jedis.publish(key, REGISTER);
-                            }
-                        }
-                    }
-                    if (admin) {
-                        clean(jedis);
-                    }
-                    if (!replicate) {
-                        break;//  If the server side has synchronized data, 
just write a single machine
-                    }
+        for (URL url : new HashSet<>(getRegistered())) {
+            if (url.getParameter(DYNAMIC_KEY, true)) {
+                String key = toCategoryPath(url);
+                if (redisClient.hset(key, url.toFullString(), 
String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
+                    redisClient.publish(key, REGISTER);
                 }
-            } catch (Throwable t) {
-                logger.warn("Failed to write provider heartbeat to redis 
registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
             }
         }
+        if (admin) {
+            clean();
+        }
     }
 
-    // The monitoring center is responsible for deleting outdated dirty data
-    private void clean(Jedis jedis) {
-        Set<String> keys = jedis.keys(root + ANY_VALUE);
+    private void clean() {
+        Set<String> keys = redisClient.scan(root + ANY_VALUE);
         if (CollectionUtils.isNotEmpty(keys)) {
             for (String key : keys) {
-                Map<String, String> values = jedis.hgetAll(key);
+                Map<String, String> values = redisClient.hgetAll(key);
                 if (CollectionUtils.isNotEmptyMap(values)) {
                     boolean delete = false;
                     long now = System.currentTimeMillis();
@@ -240,7 +165,7 @@ public class RedisRegistry extends FailbackRegistry {
                         if (url.getParameter(DYNAMIC_KEY, true)) {
                             long expire = Long.parseLong(entry.getValue());
                             if (expire < now) {
-                                jedis.hdel(key, entry.getKey());
+                                redisClient.hdel(key, entry.getKey());
                                 delete = true;
                                 if (logger.isWarnEnabled()) {
                                     logger.warn("Delete expired key: " + key + 
" -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + 
new Date(now));
@@ -249,7 +174,7 @@ public class RedisRegistry extends FailbackRegistry {
                         }
                     }
                     if (delete) {
-                        jedis.publish(key, UNREGISTER);
+                        redisClient.publish(key, UNREGISTER);
                     }
                 }
             }
@@ -258,15 +183,7 @@ public class RedisRegistry extends FailbackRegistry {
 
     @Override
     public boolean isAvailable() {
-        for (Pool<Jedis> jedisPool : jedisPools.values()) {
-            try (Jedis jedis = jedisPool.getResource()) {
-                if (jedis.isConnected()) {
-                    return true; // At least one single machine is available.
-                }
-            } catch (Throwable t) {
-            }
-        }
-        return false;
+        return redisClient.isConnected();
     }
 
     @Override
@@ -284,13 +201,10 @@ public class RedisRegistry extends FailbackRegistry {
         } catch (Throwable t) {
             logger.warn(t.getMessage(), t);
         }
-        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
-            Pool<Jedis> jedisPool = entry.getValue();
-            try {
-                jedisPool.destroy();
-            } catch (Throwable t) {
-                logger.warn("Failed to destroy the redis registry client. 
registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
-            }
+        try {
+            redisClient.destroy();
+        } catch (Throwable t) {
+            logger.warn("Failed to destroy the redis registry client. 
registry: " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
         }
         ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
     }
@@ -302,21 +216,13 @@ public class RedisRegistry extends FailbackRegistry {
         String expire = String.valueOf(System.currentTimeMillis() + 
expirePeriod);
         boolean success = false;
         RpcException exception = null;
-        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
-            Pool<Jedis> jedisPool = entry.getValue();
-            try {
-                try (Jedis jedis = jedisPool.getResource()) {
-                    jedis.hset(key, value, expire);
-                    jedis.publish(key, REGISTER);
-                    success = true;
-                    if (!replicate) {
-                        break; //  If the server side has synchronized data, 
just write a single machine
-                    }
-                }
-            } catch (Throwable t) {
-                exception = new RpcException("Failed to register service to 
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: 
" + t.getMessage(), t);
-            }
+        try {
+            redisClient.hset(key, value, expire);
+            redisClient.publish(key, REGISTER);
+        } catch (Throwable t) {
+            exception = new RpcException("Failed to register service to redis 
registry. registry: " + url.getAddress() + ", service: " + url + ", cause: " + 
t.getMessage(), t);
         }
+
         if (exception != null) {
             if (success) {
                 logger.warn(exception.getMessage(), exception);
@@ -332,21 +238,14 @@ public class RedisRegistry extends FailbackRegistry {
         String value = url.toFullString();
         RpcException exception = null;
         boolean success = false;
-        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
-            Pool<Jedis> jedisPool = entry.getValue();
-            try {
-                try (Jedis jedis = jedisPool.getResource()) {
-                    jedis.hdel(key, value);
-                    jedis.publish(key, UNREGISTER);
-                    success = true;
-                    if (!replicate) {
-                        break; //  If the server side has synchronized data, 
just write a single machine
-                    }
-                }
-            } catch (Throwable t) {
-                exception = new RpcException("Failed to unregister service to 
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: 
" + t.getMessage(), t);
-            }
+        try {
+            redisClient.hdel(key, value);
+            redisClient.publish(key, UNREGISTER);
+            success = true;
+        } catch (Throwable t) {
+            exception = new RpcException("Failed to unregister service to 
redis registry. registry: " + url.getAddress() + ", service: " + url + ", 
cause: " + t.getMessage(), t);
         }
+
         if (exception != null) {
             if (success) {
                 logger.warn(exception.getMessage(), exception);
@@ -370,33 +269,27 @@ public class RedisRegistry extends FailbackRegistry {
         }
         boolean success = false;
         RpcException exception = null;
-        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
-            Pool<Jedis> jedisPool = entry.getValue();
-            try {
-                try (Jedis jedis = jedisPool.getResource()) {
-                    if (service.endsWith(ANY_VALUE)) {
-                        admin = true;
-                        Set<String> keys = jedis.keys(service);
-                        if (CollectionUtils.isNotEmpty(keys)) {
-                            Map<String, Set<String>> serviceKeys = new 
HashMap<>();
-                            for (String key : keys) {
-                                String serviceKey = toServicePath(key);
-                                Set<String> sk = 
serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
-                                sk.add(key);
-                            }
-                            for (Set<String> sk : serviceKeys.values()) {
-                                doNotify(jedis, sk, url, 
Collections.singletonList(listener));
-                            }
-                        }
-                    } else {
-                        doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + 
ANY_VALUE), url, Collections.singletonList(listener));
+        try {
+            if (service.endsWith(ANY_VALUE)) {
+                admin = true;
+                Set<String> keys = redisClient.scan(service);
+                if (CollectionUtils.isNotEmpty(keys)) {
+                    Map<String, Set<String>> serviceKeys = new HashMap<>();
+                    for (String key : keys) {
+                        String serviceKey = toServicePath(key);
+                        Set<String> sk = 
serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
+                        sk.add(key);
+                    }
+                    for (Set<String> sk : serviceKeys.values()) {
+                        doNotify(sk, url, Collections.singletonList(listener));
                     }
-                    success = true;
-                    break; // Just read one server's data
                 }
-            } catch (Throwable t) { // Try the next server
-                exception = new RpcException("Failed to subscribe service from 
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: 
" + t.getMessage(), t);
+            } else {
+                doNotify(redisClient.scan(service + PATH_SEPARATOR + 
ANY_VALUE), url, Collections.singletonList(listener));
             }
+            success = true;
+        } catch (Throwable t) {
+            exception = new RpcException("Failed to subscribe service from 
redis registry. registry: " + url.getAddress() + ", service: " + url + ", 
cause: " + t.getMessage(), t);
         }
         if (exception != null) {
             if (success) {
@@ -411,13 +304,13 @@ public class RedisRegistry extends FailbackRegistry {
     public void doUnsubscribe(URL url, NotifyListener listener) {
     }
 
-    private void doNotify(Jedis jedis, String key) {
+    private void doNotify(String key) {
         for (Map.Entry<URL, Set<NotifyListener>> entry : new 
HashMap<>(getSubscribed()).entrySet()) {
-            doNotify(jedis, Collections.singletonList(key), entry.getKey(), 
new HashSet<>(entry.getValue()));
+            doNotify(Collections.singletonList(key), entry.getKey(), new 
HashSet<>(entry.getValue()));
         }
     }
 
-    private void doNotify(Jedis jedis, Collection<String> keys, URL url, 
Collection<NotifyListener> listeners) {
+    private void doNotify(Collection<String> keys, URL url, 
Collection<NotifyListener> listeners) {
         if (keys == null || keys.isEmpty()
                 || listeners == null || listeners.isEmpty()) {
             return;
@@ -438,7 +331,7 @@ public class RedisRegistry extends FailbackRegistry {
                 continue;
             }
             List<URL> urls = new ArrayList<>();
-            Map<String, String> values = jedis.hgetAll(key);
+            Map<String, String> values = redisClient.hgetAll(key);
             if (CollectionUtils.isNotEmptyMap(values)) {
                 for (Map.Entry<String, String> entry : values.entrySet()) {
                     URL u = URL.valueOf(entry.getKey());
@@ -500,12 +393,7 @@ public class RedisRegistry extends FailbackRegistry {
     }
 
     private class NotifySub extends JedisPubSub {
-
-        private final Pool<Jedis> jedisPool;
-
-        public NotifySub(Pool<Jedis> jedisPool) {
-            this.jedisPool = jedisPool;
-        }
+        public NotifySub() {}
 
         @Override
         public void onMessage(String key, String msg) {
@@ -515,12 +403,7 @@ public class RedisRegistry extends FailbackRegistry {
             if (msg.equals(REGISTER)
                     || msg.equals(UNREGISTER)) {
                 try {
-                    Jedis jedis = jedisPool.getResource();
-                    try {
-                        doNotify(jedis, key);
-                    } finally {
-                        jedis.close();
-                    }
+                    doNotify(key);
                 } catch (Throwable t) { // TODO Notification failure does not 
restore mechanism guarantee
                     logger.error(t.getMessage(), t);
                 }
@@ -555,7 +438,7 @@ public class RedisRegistry extends FailbackRegistry {
         private final String service;
         private final AtomicInteger connectSkip = new AtomicInteger();
         private final AtomicInteger connectSkipped = new AtomicInteger();
-        private volatile Jedis jedis;
+
         private volatile boolean first = true;
         private volatile boolean running = true;
         private volatile int connectRandom;
@@ -595,46 +478,34 @@ public class RedisRegistry extends FailbackRegistry {
                 try {
                     if (!isSkip()) {
                         try {
-                            for (Map.Entry<String, Pool<Jedis>> entry : 
jedisPools.entrySet()) {
-                                Pool<Jedis> jedisPool = entry.getValue();
-                                try {
-                                    if (jedisPool.isClosed()) {
-                                        continue;
-                                    }
-                                    jedis = jedisPool.getResource();
-                                    if (!jedis.isConnected()) {
-                                        continue;
-                                    }
-                                    try {
-                                        if (service.endsWith(ANY_VALUE)) {
-                                            if (first) {
-                                                first = false;
-                                                Set<String> keys = 
jedis.keys(service);
-                                                if 
(CollectionUtils.isNotEmpty(keys)) {
-                                                    for (String s : keys) {
-                                                        doNotify(jedis, s);
-                                                    }
-                                                }
-                                                resetSkip();
-                                            }
-                                            jedis.psubscribe(new 
NotifySub(jedisPool), service); // blocking
-                                        } else {
-                                            if (first) {
-                                                first = false;
-                                                doNotify(jedis, service);
-                                                resetSkip();
+                            if (!redisClient.isConnected()) {
+                                continue;
+                            }
+                            try {
+                                if (service.endsWith(ANY_VALUE)) {
+                                    if (first) {
+                                        first = false;
+                                        Set<String> keys = 
redisClient.scan(service);
+                                        if (CollectionUtils.isNotEmpty(keys)) {
+                                            for (String s : keys) {
+                                                doNotify(s);
                                             }
-                                            jedis.psubscribe(new 
NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking
                                         }
-                                        break;
-                                    } finally {
-                                        jedis.close();
+                                        resetSkip();
+                                    }
+                                    redisClient.psubscribe(new NotifySub(), 
service);
+                                } else {
+                                    if (first) {
+                                        first = false;
+                                        doNotify(service);
+                                        resetSkip();
                                     }
-                                } catch (Throwable t) { // Retry another server
-                                    logger.warn("Failed to subscribe service 
from redis registry. registry: " + entry.getKey() + ", cause: " + 
t.getMessage(), t);
-                                    // If you only have a single redis, you 
need to take a rest to avoid overtaking a lot of CPU resources
-                                    sleep(reconnectPeriod);
+                                    redisClient.psubscribe(new NotifySub(), 
service + PATH_SEPARATOR + ANY_VALUE); // blocking
                                 }
+                            } catch (Throwable t) { // Retry another server
+                                logger.warn("Failed to subscribe service from 
redis registry. registry: " + getUrl().getAddress() + ", cause: " + 
t.getMessage(), t);
+                                // If you only have a single redis, you need 
to take a rest to avoid overtaking a lot of CPU resources
+                                sleep(reconnectPeriod);
                             }
                         } catch (Throwable t) {
                             logger.error(t.getMessage(), t);
@@ -650,7 +521,7 @@ public class RedisRegistry extends FailbackRegistry {
         public void shutdown() {
             try {
                 running = false;
-                jedis.disconnect();
+                redisClient.disconnect();
             } catch (Throwable t) {
                 logger.warn(t.getMessage(), t);
             }
diff --git 
a/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
 
b/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
index 755e452..c41fe24 100644
--- 
a/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
+++ 
b/dubbo-registry/dubbo-registry-redis/src/test/java/org/apache/dubbo/registry/redis/RedisRegistryTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
 import redis.embedded.RedisServer;
 import redis.embedded.RedisServerBuilder;
 
@@ -36,6 +37,7 @@ import java.util.Set;
 import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class RedisRegistryTest {
 
@@ -80,7 +82,7 @@ public class RedisRegistryTest {
 
     @Test
     public void testAnyHost() {
-        Assertions.assertThrows(IllegalStateException.class, () -> {
+        assertThrows(IllegalStateException.class, () -> {
             URL errorUrl = URL.valueOf("multicast://0.0.0.0/");
             new RedisRegistryFactory().createRegistry(errorUrl);
         });
@@ -112,7 +114,7 @@ public class RedisRegistryTest {
         assertThat(redisRegistry.isAvailable(), is(true));
 
         redisRegistry.destroy();
-        assertThat(redisRegistry.isAvailable(), is(false));
+        assertThrows(JedisConnectionException.class, () -> 
redisRegistry.isAvailable());
     }
 
     @Test
@@ -120,7 +122,8 @@ public class RedisRegistryTest {
         URL url = 
URL.valueOf("redis://redisOne:8880").addParameter(BACKUP_KEY, "redisTwo:8881");
         Registry registry = new RedisRegistryFactory().createRegistry(url);
 
-        assertThat(registry.isAvailable(), is(false));
+        Registry finalRegistry = registry;
+        assertThrows(JedisConnectionException.class, () -> 
finalRegistry.isAvailable());
 
         url = 
URL.valueOf(this.registryUrl.toFullString()).addParameter(BACKUP_KEY, 
"redisTwo:8881");
         registry = new RedisRegistryFactory().createRegistry(url);
diff --git a/dubbo-registry/dubbo-registry-redis/pom.xml 
b/dubbo-remoting/dubbo-remoting-redis/pom.xml
similarity index 72%
copy from dubbo-registry/dubbo-registry-redis/pom.xml
copy to dubbo-remoting/dubbo-remoting-redis/pom.xml
index 0971cb8..b30f6a7 100644
--- a/dubbo-registry/dubbo-registry-redis/pom.xml
+++ b/dubbo-remoting/dubbo-remoting-redis/pom.xml
@@ -14,40 +14,41 @@
   See the License for the specific language governing permissions and
   limitations under the License.
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
+        <artifactId>dubbo-remoting</artifactId>
         <groupId>org.apache.dubbo</groupId>
-        <artifactId>dubbo-registry</artifactId>
         <version>${revision}</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <artifactId>dubbo-registry-redis</artifactId>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dubbo-remoting-redis</artifactId>
+
     <packaging>jar</packaging>
+
     <name>${project.artifactId}</name>
-    <description>The redis registry module of dubbo project</description>
+    <description>The redis remoting module of dubbo project</description>
+
     <properties>
         <skip_maven_deploy>false</skip_maven_deploy>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
-            <artifactId>dubbo-registry-api</artifactId>
+            <artifactId>dubbo-remoting-api</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
-            <groupId>redis.clients</groupId>
-            <artifactId>jedis</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.github.kstyrc</groupId>
-            <artifactId>embedded-redis</artifactId>
-            <scope>test</scope>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+            <version>${project.parent.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-            <scope>test</scope>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
new file mode 100644
index 0000000..7b2d7b8
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/RedisClient.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis;
+
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface RedisClient {
+    Long hset(String key, String field, String value);
+
+    Long publish(String channel, String message);
+
+//    void clean(String pattern);
+
+    boolean isConnected();
+
+    void destroy();
+
+    Long hdel(final String key, final String... fields);
+
+    Set<String> scan(String pattern);
+
+    Map<String, String> hgetAll(String key);
+
+    void psubscribe(final JedisPubSub jedisPubSub, final String... patterns);
+
+    void disconnect();
+
+    void close();
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
new file mode 100644
index 0000000..b4b104e
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/ClusterRedisClient.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.COLON_SPLIT_PATTERN;
+import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+
+public class ClusterRedisClient extends AbstractRedisClient implements 
RedisClient {
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusterRedisClient.class);
+
+    private static final int DEFAULT_TIMEOUT = 2000;
+
+    private static final int DEFAULT_SO_TIMEOUT = 2000;
+
+    private static final int DEFAULT_MAX_ATTEMPTS = 5;
+
+    private JedisCluster jedisCluster;
+
+    public ClusterRedisClient(URL url) {
+        super(url);
+        Set<HostAndPort> nodes = getNodes(url);
+        jedisCluster = new JedisCluster(nodes, 
url.getParameter("connection.timeout", DEFAULT_TIMEOUT),
+                url.getParameter("so.timeout", DEFAULT_SO_TIMEOUT), 
url.getParameter("max.attempts", DEFAULT_MAX_ATTEMPTS),
+                url.getPassword(), getConfig());
+    }
+
+    @Override
+    public Long hset(String key, String field, String value) {
+        return jedisCluster.hset(key, field, value);
+    }
+
+    @Override
+    public Long publish(String channel, String message) {
+        return jedisCluster.publish(channel, message);
+    }
+
+    @Override
+    public boolean isConnected() {
+        Map<String, JedisPool> poolMap = jedisCluster.getClusterNodes();
+        for (JedisPool jedisPool : poolMap.values()) {
+            Jedis jedis = jedisPool.getResource();
+            if (jedis.isConnected()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void destroy() {
+        jedisCluster.close();
+    }
+
+    @Override
+    public Long hdel(String key, String... fields) {
+        return jedisCluster.hdel(key, fields);
+    }
+
+    @Override
+    public Set<String> scan(String pattern) {
+        Map<String, JedisPool> nodes = jedisCluster.getClusterNodes();
+        Set<String> result = new HashSet<>();
+        for (JedisPool jedisPool : nodes.values()) {
+            Jedis jedis = jedisPool.getResource();
+            result.addAll(scan(jedis, pattern));
+            jedis.close();
+        }
+        return result;
+    }
+
+    @Override
+    public Map<String, String> hgetAll(String key) {
+        return jedisCluster.hgetAll(key);
+    }
+
+    @Override
+    public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+        jedisCluster.psubscribe(jedisPubSub, patterns);
+    }
+
+    @Override
+    public void disconnect() {
+        jedisCluster.close();
+    }
+
+    @Override
+    public void close() {
+        jedisCluster.close();
+    }
+
+    private Set<HostAndPort> getNodes(URL url) {
+        Set<HostAndPort> hostAndPorts = new HashSet<>();
+        hostAndPorts.add(new HostAndPort(url.getHost(), url.getPort()));
+        String backupAddresses = url.getBackupAddress(6379);
+        String[] nodes = StringUtils.isEmpty(backupAddresses) ? new String[0] 
: COMMA_SPLIT_PATTERN.split(backupAddresses);
+        if (nodes.length > 0) {
+            for (String node : nodes) {
+                String[] hostAndPort = COLON_SPLIT_PATTERN.split(node);
+                hostAndPorts.add(new HostAndPort(hostAndPort[0], 
Integer.valueOf(hostAndPort[1])));
+            }
+        }
+        return hostAndPorts;
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
new file mode 100644
index 0000000..d69c565
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/MonoRedisClient.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+
+public class MonoRedisClient extends AbstractRedisClient implements 
RedisClient {
+    private static final Logger logger = 
LoggerFactory.getLogger(MonoRedisClient.class);
+
+    private static final String START_CURSOR = "0";
+
+    private JedisPool jedisPool;
+
+    public MonoRedisClient(URL url) {
+        super(url);
+        jedisPool = new JedisPool(getConfig(), url.getHost(), url.getPort(),
+                url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), 
url.getPassword());
+    }
+
+    @Override
+    public Long hset(String key, String field, String value) {
+        Jedis jedis = jedisPool.getResource();
+        Long result = jedis.hset(key, field, value);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Long publish(String channel, String message) {
+        Jedis jedis = jedisPool.getResource();
+        Long result = jedis.publish(channel, message);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public boolean isConnected() {
+        Jedis jedis = jedisPool.getResource();
+        boolean connected = jedis.isConnected();
+        jedis.close();
+        return connected;
+    }
+
+    @Override
+    public void destroy() {
+        jedisPool.close();
+    }
+
+    @Override
+    public Long hdel(String key, String... fields) {
+        Jedis jedis = jedisPool.getResource();
+        Long result = jedis.hdel(key, fields);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Set<String> scan(String pattern) {
+        Jedis jedis = jedisPool.getResource();
+        Set<String> result = super.scan(jedis, pattern);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Map<String, String> hgetAll(String key) {
+        Jedis jedis = jedisPool.getResource();
+        Map<String, String> result = jedis.hgetAll(key);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+        Jedis jedis = jedisPool.getResource();
+        jedis.psubscribe(jedisPubSub, patterns);
+        jedis.close();
+    }
+
+    @Override
+    public void disconnect() {
+        jedisPool.close();
+    }
+
+    @Override
+    public void close() {
+        jedisPool.close();
+    }
+
+
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
new file mode 100644
index 0000000..ba3c890
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/jedis/SentinelRedisClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.jedis;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.RemotingConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import org.apache.dubbo.remoting.redis.support.AbstractRedisClient;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class SentinelRedisClient extends AbstractRedisClient implements 
RedisClient {
+    private static final Logger logger = 
LoggerFactory.getLogger(SentinelRedisClient.class);
+
+    private JedisSentinelPool sentinelPool;
+
+    public SentinelRedisClient(URL url) {
+        super(url);
+        String masterName = url.getParameter("master.name", "Sentinel-master");
+        String address = (new 
StringBuilder()).append(url.getAddress()).toString();
+        String[] backupAddresses = 
url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
+        if (backupAddresses.length == 0) {
+            throw new IllegalStateException("Sentinel addresses can not be 
empty");
+        }
+        Set<String> sentinels = new HashSet<>(Arrays.asList(backupAddresses));
+        sentinels.add(address);
+        sentinelPool = new JedisSentinelPool(masterName, sentinels, 
getConfig(), url.getPassword());
+    }
+
+    @Override
+    public Long hset(String key, String field, String value) {
+        Jedis jedis = sentinelPool.getResource();
+        Long result = jedis.hset(key, field, value);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Long publish(String channel, String message) {
+        Jedis jedis = sentinelPool.getResource();
+        Long result = jedis.publish(channel, message);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public boolean isConnected() {
+        Jedis jedis = sentinelPool.getResource();
+        boolean result = jedis.isConnected();
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public void destroy() {
+        sentinelPool.close();
+    }
+
+    @Override
+    public Long hdel(String key, String... fields) {
+        Jedis jedis = sentinelPool.getResource();
+        Long result = jedis.hdel(key, fields);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Set<String> scan(String pattern) {
+        Jedis jedis = sentinelPool.getResource();
+        Set<String> result = scan(jedis, pattern);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public Map<String, String> hgetAll(String key) {
+        Jedis jedis = sentinelPool.getResource();
+        Map<String, String> result = jedis.hgetAll(key);
+        jedis.close();
+        return result;
+    }
+
+    @Override
+    public void psubscribe(JedisPubSub jedisPubSub, String... patterns) {
+        Jedis jedis = sentinelPool.getResource();
+        jedis.psubscribe(jedisPubSub, patterns);
+        jedis.close();
+    }
+
+    @Override
+    public void disconnect() {
+        sentinelPool.close();
+    }
+
+    @Override
+    public void close() {
+        sentinelPool.close();
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
new file mode 100644
index 0000000..e09fd1c
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-redis/src/main/java/org/apache/dubbo/remoting/redis/support/AbstractRedisClient.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.redis.support;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.remoting.redis.RedisClient;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractRedisClient implements RedisClient {
+    private URL url;
+
+    private JedisPoolConfig config;
+
+    public AbstractRedisClient(URL url) {
+        this.url = url;
+        config = new JedisPoolConfig();
+        config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
+        config.setTestOnReturn(url.getParameter("test.on.return", false));
+        config.setTestWhileIdle(url.getParameter("test.while.idle", false));
+        if (url.getParameter("max.idle", 0) > 0) {
+            config.setMaxIdle(url.getParameter("max.idle", 0));
+        }
+        if (url.getParameter("min.idle", 0) > 0) {
+            config.setMinIdle(url.getParameter("min.idle", 0));
+        }
+        if (url.getParameter("max.active", 0) > 0) {
+            config.setMaxTotal(url.getParameter("max.active", 0));
+        }
+        if (url.getParameter("max.total", 0) > 0) {
+            config.setMaxTotal(url.getParameter("max.total", 0));
+        }
+        if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
+            config.setMaxWaitMillis(url.getParameter("max.wait", 
url.getParameter("timeout", 0)));
+        }
+        if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
+            
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 
0));
+        }
+        if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
+            
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis",
 0));
+        }
+        if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
+            
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis",
 0));
+        }
+    }
+
+    protected Set<String> scan(Jedis jedis, String pattern) {
+        Set<String> result = new HashSet<>();
+        String cursor = ScanParams.SCAN_POINTER_START;
+        ScanParams params = new ScanParams();
+        params.match(pattern);
+        while (true) {
+            ScanResult<String> scanResult = jedis.scan(cursor, params);
+            List<String> list = scanResult.getResult();
+            if (CollectionUtils.isNotEmpty(list)) {
+                result.addAll(list);
+            }
+            if (ScanParams.SCAN_POINTER_START.equals(scanResult.getCursor())) {
+                break;
+            }
+            cursor = scanResult.getCursor();
+        }
+        return result;
+    }
+
+    public URL getUrl() {
+        return url;
+    }
+
+    public JedisPoolConfig getConfig() {
+        return config;
+    }
+}
diff --git a/dubbo-remoting/pom.xml b/dubbo-remoting/pom.xml
index 19970ad..1b0524e 100644
--- a/dubbo-remoting/pom.xml
+++ b/dubbo-remoting/pom.xml
@@ -40,5 +40,6 @@
         <module>dubbo-remoting-zookeeper</module>
         <module>dubbo-remoting-netty4</module>
         <module>dubbo-remoting-etcd3</module>
+        <module>dubbo-remoting-redis</module>
     </modules>
 </project>
\ No newline at end of file

Reply via email to