This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f1c313eea6 [improve] update Redis connector config option (#8631)
f1c313eea6 is described below
commit f1c313eea6b8635dd928d44a73c3d86fe40a4346
Author: Asish <[email protected]>
AuthorDate: Tue Feb 11 19:52:15 2025 +0530
[improve] update Redis connector config option (#8631)
Co-authored-by: asishupadhyay <[email protected]>
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
.../{RedisConfig.java => RedisBaseOptions.java} | 56 +++----------------
.../seatunnel/redis/config/RedisParameters.java | 62 +++++++++++-----------
.../seatunnel/redis/config/RedisSinkOptions.java | 54 +++++++++++++++++++
.../seatunnel/redis/config/RedisSourceOptions.java | 35 ++++++++++++
.../connectors/seatunnel/redis/sink/RedisSink.java | 4 +-
.../seatunnel/redis/sink/RedisSinkFactory.java | 33 +++++++-----
.../seatunnel/redis/source/RedisSource.java | 10 ++--
.../seatunnel/redis/source/RedisSourceFactory.java | 28 +++++-----
.../seatunnel/redis/source/RedisSourceReader.java | 4 +-
10 files changed, 171 insertions(+), 117 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index e11291c418..9cf980f4bd 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -185,7 +185,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("PaimonSinkOptions");
whiteList.add("TDengineSourceOptions");
whiteList.add("PulsarSourceOptions");
- whiteList.add("RedisSinkOptions");
whiteList.add("FakeSourceOptions");
whiteList.add("HbaseSinkOptions");
whiteList.add("MongodbSinkOptions");
@@ -230,7 +229,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("SocketSourceOptions");
whiteList.add("OpenMldbSourceOptions");
whiteList.add("Web3jSourceOptions");
- whiteList.add("RedisSourceOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
whiteList.add("OracleIncrementalSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
similarity index 67%
rename from
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
rename to
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
index c9809868dc..892d110cc2 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
import java.util.List;
-public class RedisConfig {
+public class RedisBaseOptions {
public static final String CONNECTOR_IDENTITY = "Redis";
@@ -31,11 +31,6 @@ public class RedisConfig {
CLUSTER;
}
- public enum HashKeyParseMode {
- ALL,
- KV;
- }
-
public static final Option<String> HOST =
Options.key("host")
.stringType()
@@ -85,16 +80,16 @@ public class RedisConfig {
.noDefaultValue()
.withDescription("redis data types, support string hash
list set zset.");
- public static final Option<RedisConfig.Format> FORMAT =
+ public static final Option<RedisBaseOptions.Format> FORMAT =
Options.key("format")
- .enumType(RedisConfig.Format.class)
- .defaultValue(RedisConfig.Format.JSON)
+ .enumType(RedisBaseOptions.Format.class)
+ .defaultValue(RedisBaseOptions.Format.JSON)
.withDescription(
"the format of upstream data, now only support
json and text, default json.");
- public static final Option<RedisConfig.RedisMode> MODE =
+ public static final Option<RedisBaseOptions.RedisMode> MODE =
Options.key("mode")
- .enumType(RedisConfig.RedisMode.class)
+ .enumType(RedisBaseOptions.RedisMode.class)
.defaultValue(RedisMode.SINGLE)
.withDescription(
"redis mode, support single or cluster, default
value is single");
@@ -106,19 +101,6 @@ public class RedisConfig {
.withDescription(
"redis nodes information, used in cluster mode,
must like as the following format: [host1:port1, host2:port2]");
- public static final Option<RedisConfig.HashKeyParseMode>
HASH_KEY_PARSE_MODE =
- Options.key("hash_key_parse_mode")
- .enumType(RedisConfig.HashKeyParseMode.class)
- .defaultValue(HashKeyParseMode.ALL)
- .withDescription(
- "hash key parse mode, support all or kv, default
value is all");
-
- public static final Option<Long> EXPIRE =
- Options.key("expire")
- .longType()
- .defaultValue(-1L)
- .withDescription("Set redis expiration time.");
-
public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
@@ -127,32 +109,6 @@ public class RedisConfig {
"batch_size is used to control the size of a batch
of data during read and write operations"
+ ",default 10");
- public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
- Options.key("support_custom_key")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "if true, the key can be customized by the field
value in the upstream data.");
-
- public static final Option<String> VALUE_FIELD =
- Options.key("value_field")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The field of value you want to write to redis,
support string list set zset");
-
- public static final Option<String> HASH_KEY_FIELD =
- Options.key("hash_key_field")
- .stringType()
- .noDefaultValue()
- .withDescription("The field of hash key you want to write
to redis");
-
- public static final Option<String> HASH_VALUE_FIELD =
- Options.key("hash_value_field")
- .stringType()
- .noDefaultValue()
- .withDescription("The field of hash value you want to
write to redis");
-
public enum Format {
JSON,
// TEXT will be supported later
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 6dff3cba71..4425be182f 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -52,11 +52,11 @@ public class RedisParameters implements Serializable {
private String keysPattern;
private String keyField;
private RedisDataType redisDataType;
- private RedisConfig.RedisMode mode;
- private RedisConfig.HashKeyParseMode hashKeyParseMode;
+ private RedisBaseOptions.RedisMode mode;
+ private RedisSourceOptions.HashKeyParseMode hashKeyParseMode;
private List<String> redisNodes = Collections.emptyList();
- private long expire = RedisConfig.EXPIRE.defaultValue();
- private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
+ private long expire = RedisSinkOptions.EXPIRE.defaultValue();
+ private int batchSize = RedisBaseOptions.BATCH_SIZE.defaultValue();
private Boolean supportCustomKey;
private String valueField;
private String hashKeyField;
@@ -66,63 +66,63 @@ public class RedisParameters implements Serializable {
public void buildWithConfig(ReadonlyConfig config) {
// set host
- this.host = config.get(RedisConfig.HOST);
+ this.host = config.get(RedisBaseOptions.HOST);
// set port
- this.port = config.get(RedisConfig.PORT);
+ this.port = config.get(RedisBaseOptions.PORT);
// set db_num
- this.dbNum = config.get(RedisConfig.DB_NUM);
+ this.dbNum = config.get(RedisBaseOptions.DB_NUM);
// set hash key mode
- this.hashKeyParseMode = config.get(RedisConfig.HASH_KEY_PARSE_MODE);
+ this.hashKeyParseMode =
config.get(RedisSourceOptions.HASH_KEY_PARSE_MODE);
// set expire
- this.expire = config.get(RedisConfig.EXPIRE);
+ this.expire = config.get(RedisSinkOptions.EXPIRE);
// set auth
- if (config.getOptional(RedisConfig.AUTH).isPresent()) {
- this.auth = config.get(RedisConfig.AUTH);
+ if (config.getOptional(RedisBaseOptions.AUTH).isPresent()) {
+ this.auth = config.get(RedisBaseOptions.AUTH);
}
// set user
- if (config.getOptional(RedisConfig.USER).isPresent()) {
- this.user = config.get(RedisConfig.USER);
+ if (config.getOptional(RedisBaseOptions.USER).isPresent()) {
+ this.user = config.get(RedisBaseOptions.USER);
}
// set mode
- this.mode = config.get(RedisConfig.MODE);
+ this.mode = config.get(RedisBaseOptions.MODE);
// set redis nodes information
- if (config.getOptional(RedisConfig.NODES).isPresent()) {
- this.redisNodes = config.get(RedisConfig.NODES);
+ if (config.getOptional(RedisBaseOptions.NODES).isPresent()) {
+ this.redisNodes = config.get(RedisBaseOptions.NODES);
}
// set key
- if (config.getOptional(RedisConfig.KEY).isPresent()) {
- this.keyField = config.get(RedisConfig.KEY);
+ if (config.getOptional(RedisBaseOptions.KEY).isPresent()) {
+ this.keyField = config.get(RedisBaseOptions.KEY);
}
// set keysPattern
- if (config.getOptional(RedisConfig.KEY_PATTERN).isPresent()) {
- this.keysPattern = config.get(RedisConfig.KEY_PATTERN);
+ if (config.getOptional(RedisBaseOptions.KEY_PATTERN).isPresent()) {
+ this.keysPattern = config.get(RedisBaseOptions.KEY_PATTERN);
}
// set redis data type verification factory createAndPrepareSource
- this.redisDataType = config.get(RedisConfig.DATA_TYPE);
+ this.redisDataType = config.get(RedisBaseOptions.DATA_TYPE);
// Indicates the number of keys to attempt to return per
iteration.default 10
- this.batchSize = config.get(RedisConfig.BATCH_SIZE);
+ this.batchSize = config.get(RedisBaseOptions.BATCH_SIZE);
// set support custom key
- if (config.getOptional(RedisConfig.SUPPORT_CUSTOM_KEY).isPresent()) {
- this.supportCustomKey = config.get(RedisConfig.SUPPORT_CUSTOM_KEY);
+ if
(config.getOptional(RedisSinkOptions.SUPPORT_CUSTOM_KEY).isPresent()) {
+ this.supportCustomKey =
config.get(RedisSinkOptions.SUPPORT_CUSTOM_KEY);
}
// set value field
- if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) {
- this.valueField = config.get(RedisConfig.VALUE_FIELD);
+ if (config.getOptional(RedisSinkOptions.VALUE_FIELD).isPresent()) {
+ this.valueField = config.get(RedisSinkOptions.VALUE_FIELD);
}
// set hash key field
- if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) {
- this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD);
+ if (config.getOptional(RedisSinkOptions.HASH_KEY_FIELD).isPresent()) {
+ this.hashKeyField = config.get(RedisSinkOptions.HASH_KEY_FIELD);
}
// set hash value field
- if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) {
- this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD);
+ if (config.getOptional(RedisSinkOptions.HASH_VALUE_FIELD).isPresent())
{
+ this.hashValueField =
config.get(RedisSinkOptions.HASH_VALUE_FIELD);
}
}
public RedisClient buildRedisClient() {
Jedis jedis = this.buildJedis();
this.redisVersion = extractRedisVersion(jedis);
- if (mode.equals(RedisConfig.RedisMode.SINGLE)) {
+ if (mode.equals(RedisBaseOptions.RedisMode.SINGLE)) {
return new RedisSingleClient(this, jedis, redisVersion);
} else {
return new RedisClusterClient(this, jedis, redisVersion);
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java
new file mode 100644
index 0000000000..960204321d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSinkOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class RedisSinkOptions extends RedisBaseOptions {
+
+ public static final Option<Long> EXPIRE =
+ Options.key("expire")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription("Set redis expiration time.");
+
+ public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
+ Options.key("support_custom_key")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "if true, the key can be customized by the field
value in the upstream data.");
+ public static final Option<String> VALUE_FIELD =
+ Options.key("value_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The field of value you want to write to redis,
support string list set zset");
+ public static final Option<String> HASH_KEY_FIELD =
+ Options.key("hash_key_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The field of hash key you want to write
to redis");
+
+ public static final Option<String> HASH_VALUE_FIELD =
+ Options.key("hash_value_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The field of hash value you want to
write to redis");
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
new file mode 100644
index 0000000000..a02e113cea
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class RedisSourceOptions extends RedisBaseOptions {
+ public enum HashKeyParseMode {
+ ALL,
+ KV;
+ }
+
+ public static final Option<HashKeyParseMode> HASH_KEY_PARSE_MODE =
+ Options.key("hash_key_parse_mode")
+ .enumType(HashKeyParseMode.class)
+ .defaultValue(HashKeyParseMode.ALL)
+ .withDescription(
+ "hash key parse mode, support all or kv, default
value is all");
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index ddb1901205..67774821ed 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import java.io.IOException;
@@ -46,7 +46,7 @@ public class RedisSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
@Override
public String getPluginName() {
- return RedisConfig.CONNECTOR_IDENTITY;
+ return RedisBaseOptions.CONNECTOR_IDENTITY;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index 38098a2560..9204f2c602 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -24,7 +24,8 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSinkOptions;
import com.google.auto.service.AutoService;
@@ -45,20 +46,26 @@ public class RedisSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(
- RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY,
RedisConfig.DATA_TYPE)
+ RedisBaseOptions.HOST,
+ RedisBaseOptions.PORT,
+ RedisBaseOptions.KEY,
+ RedisBaseOptions.DATA_TYPE)
.optional(
- RedisConfig.MODE,
- RedisConfig.AUTH,
- RedisConfig.USER,
- RedisConfig.KEY_PATTERN,
- RedisConfig.FORMAT,
- RedisConfig.EXPIRE,
- RedisConfig.SUPPORT_CUSTOM_KEY,
- RedisConfig.VALUE_FIELD,
- RedisConfig.HASH_KEY_FIELD,
- RedisConfig.HASH_VALUE_FIELD,
+ RedisBaseOptions.MODE,
+ RedisBaseOptions.AUTH,
+ RedisBaseOptions.USER,
+ RedisBaseOptions.KEY_PATTERN,
+ RedisBaseOptions.FORMAT,
+ RedisSinkOptions.EXPIRE,
+ RedisSinkOptions.SUPPORT_CUSTOM_KEY,
+ RedisSinkOptions.VALUE_FIELD,
+ RedisSinkOptions.HASH_KEY_FIELD,
+ RedisSinkOptions.HASH_VALUE_FIELD,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
- .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER,
RedisConfig.NODES)
+ .conditional(
+ RedisBaseOptions.MODE,
+ RedisBaseOptions.RedisMode.CLUSTER,
+ RedisBaseOptions.NODES)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index 28f5693bb4..71bd16a6c7 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -32,7 +32,7 @@ import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
@@ -48,7 +48,7 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public String getPluginName() {
- return RedisConfig.CONNECTOR_IDENTITY;
+ return RedisBaseOptions.CONNECTOR_IDENTITY;
}
public RedisSource(ReadonlyConfig readonlyConfig) {
@@ -56,7 +56,7 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
this.redisParameters.buildWithConfig(readonlyConfig);
// TODO: use format SPI
// default use json format
- if (readonlyConfig.getOptional(RedisConfig.FORMAT).isPresent()) {
+ if (readonlyConfig.getOptional(RedisBaseOptions.FORMAT).isPresent()) {
if
(!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
throw new RedisConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -67,8 +67,8 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
"Must config schema when format parameter been
config"));
}
- RedisConfig.Format format = readonlyConfig.get(RedisConfig.FORMAT);
- if (RedisConfig.Format.JSON.equals(format)) {
+ RedisBaseOptions.Format format =
readonlyConfig.get(RedisBaseOptions.FORMAT);
+ if (RedisBaseOptions.Format.JSON.equals(format)) {
this.catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.deserializationSchema =
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
index c4f9ac099e..69ee36464b 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -25,7 +25,8 @@ import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions;
import com.google.auto.service.AutoService;
@@ -48,18 +49,21 @@ public class RedisSourceFactory implements
TableSourceFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(
- RedisConfig.HOST,
- RedisConfig.PORT,
- RedisConfig.KEY_PATTERN,
- RedisConfig.DATA_TYPE)
+ RedisBaseOptions.HOST,
+ RedisBaseOptions.PORT,
+ RedisBaseOptions.KEY_PATTERN,
+ RedisBaseOptions.DATA_TYPE)
.optional(
- RedisConfig.MODE,
- RedisConfig.HASH_KEY_PARSE_MODE,
- RedisConfig.AUTH,
- RedisConfig.USER,
- RedisConfig.KEY)
- .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER,
RedisConfig.NODES)
- .bundled(RedisConfig.FORMAT, TableSchemaOptions.SCHEMA)
+ RedisBaseOptions.MODE,
+ RedisSourceOptions.HASH_KEY_PARSE_MODE,
+ RedisBaseOptions.AUTH,
+ RedisBaseOptions.USER,
+ RedisBaseOptions.KEY)
+ .conditional(
+ RedisBaseOptions.MODE,
+ RedisBaseOptions.RedisMode.CLUSTER,
+ RedisBaseOptions.NODES)
+ .bundled(RedisBaseOptions.FORMAT, TableSchemaOptions.SCHEMA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index be67c266f9..bdb887c097 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -25,9 +25,9 @@ import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.commons.collections4.CollectionUtils;
@@ -175,7 +175,7 @@ public class RedisSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
return;
}
for (Map<String, String> recordsMap : values) {
- if (redisParameters.getHashKeyParseMode() ==
RedisConfig.HashKeyParseMode.KV) {
+ if (redisParameters.getHashKeyParseMode() ==
RedisSourceOptions.HashKeyParseMode.KV) {
deserializationSchema.deserialize(
JsonUtils.toJsonString(recordsMap).getBytes(), output);
} else {