This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e84dcb8c10 [Feature][Connector-V2] Support 
TableSourceFactory/TableSinkFactory on redis  (#5901)
e84dcb8c10 is described below

commit e84dcb8c108d7bfe5a233d16bc4a830a0d952a3e
Author: lizhenglei <[email protected]>
AuthorDate: Fri Nov 24 17:54:11 2023 +0800

    [Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on 
redis  (#5901)
---
 .../connectors/seatunnel/http/sink/HttpSink.java   |  2 +-
 .../seatunnel/redis/config/RedisConfig.java        | 16 +++--
 .../seatunnel/redis/config/RedisParameters.java    | 70 +++++++---------------
 .../connectors/seatunnel/redis/sink/RedisSink.java | 51 ++++------------
 .../seatunnel/redis/sink/RedisSinkFactory.java     |  9 +++
 .../seatunnel/redis/source/RedisSource.java        | 57 +++++++-----------
 .../seatunnel/redis/source/RedisSourceFactory.java | 18 +++++-
 7 files changed, 91 insertions(+), 132 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 140e24a4f0..1cf22b0164 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -75,7 +75,7 @@ public class HttpSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public String getPluginName() {
-        return "Http";
+        return HttpConfig.CONNECTOR_IDENTITY;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index 7b0c20cbea..c8a0a02dc7 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -20,8 +20,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.redis.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.util.List;
+
 public class RedisConfig {
 
+    public static final String CONNECTOR_IDENTITY = "Redis";
+
     public enum RedisMode {
         SINGLE,
         CLUSTER;
@@ -38,8 +42,8 @@ public class RedisConfig {
                     .noDefaultValue()
                     .withDescription("redis hostname or ip");
 
-    public static final Option<String> PORT =
-            
Options.key("port").stringType().noDefaultValue().withDescription("redis port");
+    public static final Option<Integer> PORT =
+            
Options.key("port").intType().noDefaultValue().withDescription("redis port");
 
     public static final Option<String> AUTH =
             Options.key("auth")
@@ -75,9 +79,9 @@ public class RedisConfig {
                     .noDefaultValue()
                     .withDescription("The value of key you want to write to 
redis.");
 
-    public static final Option<String> DATA_TYPE =
+    public static final Option<RedisDataType> DATA_TYPE =
             Options.key("data_type")
-                    .stringType()
+                    .enumType(RedisDataType.class)
                     .noDefaultValue()
                     .withDescription("redis data types, support key hash list 
set zset.");
 
@@ -95,9 +99,9 @@ public class RedisConfig {
                     .withDescription(
                             "redis mode, support single or cluster, default 
value is single");
 
-    public static final Option<String> NODES =
+    public static final Option<List<String>> NODES =
             Options.key("nodes")
-                    .stringType()
+                    .listType()
                     .noDefaultValue()
                     .withDescription(
                             "redis nodes information, used in cluster mode, 
must like as the following format: [host1:port1, host2:port2]");
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 0e3d642e2d..d32f3bbb0d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redis.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 
@@ -50,64 +49,41 @@ public class RedisParameters implements Serializable {
     private List<String> redisNodes = Collections.emptyList();
     private long expire = RedisConfig.EXPIRE.defaultValue();
 
-    public void buildWithConfig(Config config) {
+    public void buildWithConfig(ReadonlyConfig config) {
         // set host
-        this.host = config.getString(RedisConfig.HOST.key());
+        this.host = config.get(RedisConfig.HOST);
         // set port
-        this.port = config.getInt(RedisConfig.PORT.key());
-        // set auth
-        if (config.hasPath(RedisConfig.AUTH.key())) {
-            this.auth = config.getString(RedisConfig.AUTH.key());
-        }
+        this.port = config.get(RedisConfig.PORT);
         // set db_num
-        if (config.hasPath(RedisConfig.DB_NUM.key())) {
-            this.dbNum = config.getInt(RedisConfig.DB_NUM.key());
+        this.dbNum = config.get(RedisConfig.DB_NUM);
+        // set hash key mode
+        this.hashKeyParseMode = config.get(RedisConfig.HASH_KEY_PARSE_MODE);
+        // set expire
+        this.expire = config.get(RedisConfig.EXPIRE);
+        // set auth
+        if (config.getOptional(RedisConfig.AUTH).isPresent()) {
+            this.auth = config.get(RedisConfig.AUTH);
         }
         // set user
-        if (config.hasPath(RedisConfig.USER.key())) {
-            this.user = config.getString(RedisConfig.USER.key());
+        if (config.getOptional(RedisConfig.USER).isPresent()) {
+            this.user = config.get(RedisConfig.USER);
         }
         // set mode
-        if (config.hasPath(RedisConfig.MODE.key())) {
-            this.mode =
-                    RedisConfig.RedisMode.valueOf(
-                            
config.getString(RedisConfig.MODE.key()).toUpperCase());
-        } else {
-            this.mode = RedisConfig.MODE.defaultValue();
-        }
-        // set hash key mode
-        if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE.key())) {
-            this.hashKeyParseMode =
-                    RedisConfig.HashKeyParseMode.valueOf(
-                            
config.getString(RedisConfig.HASH_KEY_PARSE_MODE.key()).toUpperCase());
-        } else {
-            this.hashKeyParseMode = 
RedisConfig.HASH_KEY_PARSE_MODE.defaultValue();
-        }
+        this.mode = config.get(RedisConfig.MODE);
         // set redis nodes information
-        if (config.hasPath(RedisConfig.NODES.key())) {
-            this.redisNodes = config.getStringList(RedisConfig.NODES.key());
+        if (config.getOptional(RedisConfig.NODES).isPresent()) {
+            this.redisNodes = config.get(RedisConfig.NODES);
         }
         // set key
-        if (config.hasPath(RedisConfig.KEY.key())) {
-            this.keyField = config.getString(RedisConfig.KEY.key());
+        if (config.getOptional(RedisConfig.KEY).isPresent()) {
+            this.keyField = config.get(RedisConfig.KEY);
         }
         // set keysPattern
-        if (config.hasPath(RedisConfig.KEY_PATTERN.key())) {
-            this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key());
-        }
-        if (config.hasPath(RedisConfig.EXPIRE.key())) {
-            this.expire = config.getLong(RedisConfig.EXPIRE.key());
-        }
-        // set redis data type
-        try {
-            String dataType = config.getString(RedisConfig.DATA_TYPE.key());
-            this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
-        } catch (IllegalArgumentException e) {
-            throw new RedisConnectorException(
-                    CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                    "Redis source connector only support these data types 
[key, hash, list, set, zset]",
-                    e);
+        if (config.getOptional(RedisConfig.KEY_PATTERN).isPresent()) {
+            this.keysPattern = config.get(RedisConfig.KEY_PATTERN);
         }
+        // set redis data type verification factory createAndPrepareSource
+        this.redisDataType = config.get(RedisConfig.DATA_TYPE);
     }
 
     public Jedis buildJedis() {
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index 381576c6c9..ac8c544703 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -17,61 +17,34 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redis.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
-import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
-
-import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 
-@AutoService(SeaTunnelSink.class)
 public class RedisSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     private final RedisParameters redisParameters = new RedisParameters();
     private SeaTunnelRowType seaTunnelRowType;
-    private Config pluginConfig;
-
-    @Override
-    public String getPluginName() {
-        return "Redis";
+    private ReadonlyConfig readonlyConfig;
+    private CatalogTable catalogTable;
+
+    public RedisSink(ReadonlyConfig config, CatalogTable table) {
+        this.readonlyConfig = config;
+        this.catalogTable = table;
+        this.redisParameters.buildWithConfig(config);
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        RedisConfig.HOST.key(),
-                        RedisConfig.PORT.key(),
-                        RedisConfig.KEY.key(),
-                        RedisConfig.DATA_TYPE.key());
-        if (!result.isSuccess()) {
-            throw new RedisConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        this.redisParameters.buildWithConfig(pluginConfig);
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public String getPluginName() {
+        return RedisConfig.CONNECTOR_IDENTITY;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index 22ae156874..c4768c0618 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -18,8 +18,11 @@
 package org.apache.seatunnel.connectors.seatunnel.redis.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
 
 import com.google.auto.service.AutoService;
@@ -31,6 +34,12 @@ public class RedisSinkFactory implements TableSinkFactory {
         return "Redis";
     }
 
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new RedisSink(context.getOptions(), catalogTable);
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index c7ad6e6de8..3818848fc1 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -17,20 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redis.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
@@ -40,40 +35,29 @@ import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
-import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
+
+import java.util.List;
 
-@AutoService(SeaTunnelSource.class)
 public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private final RedisParameters redisParameters = new RedisParameters();
     private SeaTunnelRowType seaTunnelRowType;
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
+    private CatalogTable catalogTable;
+
     @Override
     public String getPluginName() {
-        return "Redis";
+        return RedisConfig.CONNECTOR_IDENTITY;
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        RedisConfig.HOST.key(),
-                        RedisConfig.PORT.key(),
-                        RedisConfig.KEY_PATTERN.key(),
-                        RedisConfig.DATA_TYPE.key());
-        if (!result.isSuccess()) {
-            throw new RedisConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.redisParameters.buildWithConfig(pluginConfig);
+    public RedisSource(ReadonlyConfig readonlyConfig) {
+
+        this.redisParameters.buildWithConfig(readonlyConfig);
         // TODO: use format SPI
         // default use json format
-        if (pluginConfig.hasPath(RedisConfig.FORMAT.key())) {
-            if (!pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
+        if (readonlyConfig.getOptional(RedisConfig.FORMAT).isPresent()) {
+            if 
(!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
                 throw new RedisConnectorException(
                         SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                         String.format(
@@ -83,17 +67,16 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
                                 "Must config schema when format parameter been 
config"));
             }
 
-            RedisConfig.Format format =
-                    RedisConfig.Format.valueOf(
-                            
pluginConfig.getString(RedisConfig.FORMAT.key()).toUpperCase());
+            RedisConfig.Format format = readonlyConfig.get(RedisConfig.FORMAT);
             if (RedisConfig.Format.JSON.equals(format)) {
-                this.seaTunnelRowType =
-                        
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+                this.catalogTable = 
CatalogTableUtil.buildWithConfig(readonlyConfig);
+                this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
                 this.deserializationSchema =
                         new JsonDeserializationSchema(false, false, 
seaTunnelRowType);
             }
         } else {
-            this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
+            this.catalogTable = CatalogTableUtil.buildSimpleTextTable();
+            this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
             this.deserializationSchema = null;
         }
     }
@@ -104,8 +87,8 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return seaTunnelRowType;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Lists.newArrayList(catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
index 02cab4a558..c4f9ac099e 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -19,13 +19,18 @@ package 
org.apache.seatunnel.connectors.seatunnel.redis.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class RedisSourceFactory implements TableSourceFactory {
     @Override
@@ -33,17 +38,26 @@ public class RedisSourceFactory implements 
TableSourceFactory {
         return "Redis";
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
RedisSource(context.getOptions());
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(
-                        RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, 
RedisConfig.DATA_TYPE)
+                        RedisConfig.HOST,
+                        RedisConfig.PORT,
+                        RedisConfig.KEY_PATTERN,
+                        RedisConfig.DATA_TYPE)
                 .optional(
                         RedisConfig.MODE,
                         RedisConfig.HASH_KEY_PARSE_MODE,
                         RedisConfig.AUTH,
                         RedisConfig.USER,
-                        RedisConfig.KEY_PATTERN)
+                        RedisConfig.KEY)
                 .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
                 .bundled(RedisConfig.FORMAT, TableSchemaOptions.SCHEMA)
                 .build();

Reply via email to