This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e84dcb8c10 [Feature][Connector-V2] Support
TableSourceFactory/TableSinkFactory on redis (#5901)
e84dcb8c10 is described below
commit e84dcb8c108d7bfe5a233d16bc4a830a0d952a3e
Author: lizhenglei <[email protected]>
AuthorDate: Fri Nov 24 17:54:11 2023 +0800
[Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on
redis (#5901)
---
.../connectors/seatunnel/http/sink/HttpSink.java | 2 +-
.../seatunnel/redis/config/RedisConfig.java | 16 +++--
.../seatunnel/redis/config/RedisParameters.java | 70 +++++++---------------
.../connectors/seatunnel/redis/sink/RedisSink.java | 51 ++++------------
.../seatunnel/redis/sink/RedisSinkFactory.java | 9 +++
.../seatunnel/redis/source/RedisSource.java | 57 +++++++-----------
.../seatunnel/redis/source/RedisSourceFactory.java | 18 +++++-
7 files changed, 91 insertions(+), 132 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 140e24a4f0..1cf22b0164 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -75,7 +75,7 @@ public class HttpSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public String getPluginName() {
- return "Http";
+ return HttpConfig.CONNECTOR_IDENTITY;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index 7b0c20cbea..c8a0a02dc7 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -20,8 +20,12 @@ package
org.apache.seatunnel.connectors.seatunnel.redis.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.util.List;
+
public class RedisConfig {
+ public static final String CONNECTOR_IDENTITY = "Redis";
+
public enum RedisMode {
SINGLE,
CLUSTER;
@@ -38,8 +42,8 @@ public class RedisConfig {
.noDefaultValue()
.withDescription("redis hostname or ip");
- public static final Option<String> PORT =
-
Options.key("port").stringType().noDefaultValue().withDescription("redis port");
+ public static final Option<Integer> PORT =
+
Options.key("port").intType().noDefaultValue().withDescription("redis port");
public static final Option<String> AUTH =
Options.key("auth")
@@ -75,9 +79,9 @@ public class RedisConfig {
.noDefaultValue()
.withDescription("The value of key you want to write to
redis.");
- public static final Option<String> DATA_TYPE =
+ public static final Option<RedisDataType> DATA_TYPE =
Options.key("data_type")
- .stringType()
+ .enumType(RedisDataType.class)
.noDefaultValue()
.withDescription("redis data types, support key hash list
set zset.");
@@ -95,9 +99,9 @@ public class RedisConfig {
.withDescription(
"redis mode, support single or cluster, default
value is single");
- public static final Option<String> NODES =
+ public static final Option<List<String>> NODES =
Options.key("nodes")
- .stringType()
+ .listType()
.noDefaultValue()
.withDescription(
"redis nodes information, used in cluster mode,
must like as the following format: [host1:port1, host2:port2]");
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 0e3d642e2d..d32f3bbb0d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
@@ -50,64 +49,41 @@ public class RedisParameters implements Serializable {
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisConfig.EXPIRE.defaultValue();
- public void buildWithConfig(Config config) {
+ public void buildWithConfig(ReadonlyConfig config) {
// set host
- this.host = config.getString(RedisConfig.HOST.key());
+ this.host = config.get(RedisConfig.HOST);
// set port
- this.port = config.getInt(RedisConfig.PORT.key());
- // set auth
- if (config.hasPath(RedisConfig.AUTH.key())) {
- this.auth = config.getString(RedisConfig.AUTH.key());
- }
+ this.port = config.get(RedisConfig.PORT);
// set db_num
- if (config.hasPath(RedisConfig.DB_NUM.key())) {
- this.dbNum = config.getInt(RedisConfig.DB_NUM.key());
+ this.dbNum = config.get(RedisConfig.DB_NUM);
+ // set hash key mode
+ this.hashKeyParseMode = config.get(RedisConfig.HASH_KEY_PARSE_MODE);
+ // set expire
+ this.expire = config.get(RedisConfig.EXPIRE);
+ // set auth
+ if (config.getOptional(RedisConfig.AUTH).isPresent()) {
+ this.auth = config.get(RedisConfig.AUTH);
}
// set user
- if (config.hasPath(RedisConfig.USER.key())) {
- this.user = config.getString(RedisConfig.USER.key());
+ if (config.getOptional(RedisConfig.USER).isPresent()) {
+ this.user = config.get(RedisConfig.USER);
}
// set mode
- if (config.hasPath(RedisConfig.MODE.key())) {
- this.mode =
- RedisConfig.RedisMode.valueOf(
-
config.getString(RedisConfig.MODE.key()).toUpperCase());
- } else {
- this.mode = RedisConfig.MODE.defaultValue();
- }
- // set hash key mode
- if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE.key())) {
- this.hashKeyParseMode =
- RedisConfig.HashKeyParseMode.valueOf(
-
config.getString(RedisConfig.HASH_KEY_PARSE_MODE.key()).toUpperCase());
- } else {
- this.hashKeyParseMode =
RedisConfig.HASH_KEY_PARSE_MODE.defaultValue();
- }
+ this.mode = config.get(RedisConfig.MODE);
// set redis nodes information
- if (config.hasPath(RedisConfig.NODES.key())) {
- this.redisNodes = config.getStringList(RedisConfig.NODES.key());
+ if (config.getOptional(RedisConfig.NODES).isPresent()) {
+ this.redisNodes = config.get(RedisConfig.NODES);
}
// set key
- if (config.hasPath(RedisConfig.KEY.key())) {
- this.keyField = config.getString(RedisConfig.KEY.key());
+ if (config.getOptional(RedisConfig.KEY).isPresent()) {
+ this.keyField = config.get(RedisConfig.KEY);
}
// set keysPattern
- if (config.hasPath(RedisConfig.KEY_PATTERN.key())) {
- this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key());
- }
- if (config.hasPath(RedisConfig.EXPIRE.key())) {
- this.expire = config.getLong(RedisConfig.EXPIRE.key());
- }
- // set redis data type
- try {
- String dataType = config.getString(RedisConfig.DATA_TYPE.key());
- this.redisDataType = RedisDataType.valueOf(dataType.toUpperCase());
- } catch (IllegalArgumentException e) {
- throw new RedisConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Redis source connector only support these data types
[key, hash, list, set, zset]",
- e);
+ if (config.getOptional(RedisConfig.KEY_PATTERN).isPresent()) {
+ this.keysPattern = config.get(RedisConfig.KEY_PATTERN);
}
+ // set redis data type verification factory createAndPrepareSource
+ this.redisDataType = config.get(RedisConfig.DATA_TYPE);
}
public Jedis buildJedis() {
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index 381576c6c9..ac8c544703 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -17,61 +17,34 @@
package org.apache.seatunnel.connectors.seatunnel.redis.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
-import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
-
-import com.google.auto.service.AutoService;
import java.io.IOException;
-@AutoService(SeaTunnelSink.class)
public class RedisSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private final RedisParameters redisParameters = new RedisParameters();
private SeaTunnelRowType seaTunnelRowType;
- private Config pluginConfig;
-
- @Override
- public String getPluginName() {
- return "Redis";
+ private ReadonlyConfig readonlyConfig;
+ private CatalogTable catalogTable;
+
+ public RedisSink(ReadonlyConfig config, CatalogTable table) {
+ this.readonlyConfig = config;
+ this.catalogTable = table;
+ this.redisParameters.buildWithConfig(config);
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- RedisConfig.HOST.key(),
- RedisConfig.PORT.key(),
- RedisConfig.KEY.key(),
- RedisConfig.DATA_TYPE.key());
- if (!result.isSuccess()) {
- throw new RedisConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- this.redisParameters.buildWithConfig(pluginConfig);
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public String getPluginName() {
+ return RedisConfig.CONNECTOR_IDENTITY;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index 22ae156874..c4768c0618 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -18,8 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.redis.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import com.google.auto.service.AutoService;
@@ -31,6 +34,12 @@ public class RedisSinkFactory implements TableSinkFactory {
return "Redis";
}
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new RedisSink(context.getOptions(), catalogTable);
+ }
+
@Override
public OptionRule optionRule() {
return OptionRule.builder()
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index c7ad6e6de8..3818848fc1 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -17,20 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.redis.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
@@ -40,40 +35,29 @@ import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import com.google.auto.service.AutoService;
+import com.google.common.collect.Lists;
+
+import java.util.List;
-@AutoService(SeaTunnelSource.class)
public class RedisSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private final RedisParameters redisParameters = new RedisParameters();
private SeaTunnelRowType seaTunnelRowType;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private CatalogTable catalogTable;
+
@Override
public String getPluginName() {
- return "Redis";
+ return RedisConfig.CONNECTOR_IDENTITY;
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- RedisConfig.HOST.key(),
- RedisConfig.PORT.key(),
- RedisConfig.KEY_PATTERN.key(),
- RedisConfig.DATA_TYPE.key());
- if (!result.isSuccess()) {
- throw new RedisConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.redisParameters.buildWithConfig(pluginConfig);
+ public RedisSource(ReadonlyConfig readonlyConfig) {
+
+ this.redisParameters.buildWithConfig(readonlyConfig);
// TODO: use format SPI
// default use json format
- if (pluginConfig.hasPath(RedisConfig.FORMAT.key())) {
- if (!pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
+ if (readonlyConfig.getOptional(RedisConfig.FORMAT).isPresent()) {
+ if
(!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
throw new RedisConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
@@ -83,17 +67,16 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
"Must config schema when format parameter been
config"));
}
- RedisConfig.Format format =
- RedisConfig.Format.valueOf(
-
pluginConfig.getString(RedisConfig.FORMAT.key()).toUpperCase());
+ RedisConfig.Format format = readonlyConfig.get(RedisConfig.FORMAT);
if (RedisConfig.Format.JSON.equals(format)) {
- this.seaTunnelRowType =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+ this.catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.deserializationSchema =
new JsonDeserializationSchema(false, false,
seaTunnelRowType);
}
} else {
- this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
+ this.catalogTable = CatalogTableUtil.buildSimpleTextTable();
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.deserializationSchema = null;
}
}
@@ -104,8 +87,8 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return seaTunnelRowType;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Lists.newArrayList(catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
index 02cab4a558..c4f9ac099e 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -19,13 +19,18 @@ package
org.apache.seatunnel.connectors.seatunnel.redis.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+
@AutoService(Factory.class)
public class RedisSourceFactory implements TableSourceFactory {
@Override
@@ -33,17 +38,26 @@ public class RedisSourceFactory implements
TableSourceFactory {
return "Redis";
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
RedisSource(context.getOptions());
+ }
+
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(
- RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY,
RedisConfig.DATA_TYPE)
+ RedisConfig.HOST,
+ RedisConfig.PORT,
+ RedisConfig.KEY_PATTERN,
+ RedisConfig.DATA_TYPE)
.optional(
RedisConfig.MODE,
RedisConfig.HASH_KEY_PARSE_MODE,
RedisConfig.AUTH,
RedisConfig.USER,
- RedisConfig.KEY_PATTERN)
+ RedisConfig.KEY)
.conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER,
RedisConfig.NODES)
.bundled(RedisConfig.FORMAT, TableSchemaOptions.SCHEMA)
.build();