This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ad7e369b8 [INLONG-5447][Sort] Add lookup support for Redis (#5543)
ad7e369b8 is described below
commit ad7e369b8d4eba628717d23f1d39334042e8f072
Author: Charles <[email protected]>
AuthorDate: Tue Aug 16 15:07:15 2022 +0800
[INLONG-5447][Sort] Add lookup support for Redis (#5543)
---
inlong-sort/sort-connectors/pom.xml | 1 +
inlong-sort/sort-connectors/redis/pom.xml | 76 ++++++++
.../redis/common/config/RedisLookupOptions.java | 129 +++++++++++++
.../sort/redis/common/config/RedisOptions.java | 208 +++++++++++++++++++++
.../handler/FlinkJedisClusterConfigHandler.java | 68 +++++++
.../handler/FlinkJedisSentinelConfigHandler.java | 69 +++++++
.../handler/FlinkJedisStandaloneConfigHandler.java | 66 +++++++
.../container/InlongRedisClusterContainer.java | 115 ++++++++++++
.../container/InlongRedisCommandsContainer.java | 77 ++++++++
.../common/container/InlongRedisContainer.java | 184 ++++++++++++++++++
.../container/RedisCommandsContainerBuilder.java | 129 +++++++++++++
.../common/descriptor/InlongRedisValidator.java | 35 ++++
.../common/handler/InlongJedisConfigHandler.java | 37 ++++
.../redis/common/handler/RedisMapperHandler.java | 67 +++++++
.../sort/redis/common/mapper/RedisCommand.java | 142 ++++++++++++++
.../common/mapper/RedisCommandDescription.java | 158 ++++++++++++++++
.../sort/redis/common/mapper/RedisMapper.java | 79 ++++++++
.../sort/redis/common/mapper/row/GetMapper.java | 32 ++++
.../sort/redis/common/mapper/row/HgetMapper.java | 35 ++++
.../redis/common/mapper/row/RowRedisMapper.java | 122 ++++++++++++
.../redis/common/mapper/row/ZrevrankMapper.java | 35 ++++
.../sort/redis/common/mapper/row/ZscoreMapper.java | 35 ++++
.../sort/redis/source/RedisDynamicTableSource.java | 93 +++++++++
.../redis/source/RedisRowDataLookupFunction.java | 152 +++++++++++++++
.../sort/redis/table/RedisDynamicTableFactory.java | 148 +++++++++++++++
.../inlong/sort/redis/table/SchemaValidator.java | 68 +++++++
...ng.connectors.redis.common.hanlder.RedisHandler | 23 +++
.../org.apache.flink.table.factories.Factory | 16 ++
licenses/inlong-sort-connectors/LICENSE | 11 ++
licenses/inlong-sort-connectors/NOTICE | 8 +
pom.xml | 7 +
31 files changed, 2425 insertions(+)
diff --git a/inlong-sort/sort-connectors/pom.xml
b/inlong-sort/sort-connectors/pom.xml
index fe6503227..14eb9f268 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -52,6 +52,7 @@
<module>elasticsearch-base</module>
<module>elasticsearch-6</module>
<module>elasticsearch-7</module>
+ <module>redis</module>
<module>tubemq</module>
</modules>
diff --git a/inlong-sort/sort-connectors/redis/pom.xml
b/inlong-sort/sort-connectors/redis/pom.xml
new file mode 100644
index 000000000..ba01ed508
--- /dev/null
+++ b/inlong-sort/sort-connectors/redis/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>sort-connectors</artifactId>
+ <groupId>org.apache.inlong</groupId>
+ <version>1.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>sort-connector-redis</artifactId>
+ <name>Apache InLong - Sort-connector-redis</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+
<artifactId>flink-connector-redis_${flink.scala.binary.version}</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+ <include>
+
META-INF/services/org.apache.flink.table.factories.Factory
+ </include>
+ <include>
+
META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
+ </include>
+ </includes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java
new file mode 100644
index 000000000..5e95c3028
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Options for the Redis lookup.
+ */
+@Internal
+public class RedisLookupOptions implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+ /**
+ * Asynchronous processing has not been implemented yet, but the entry is
reserved
+ */
+ private final boolean lookupAsync;
+
+ public RedisLookupOptions(
+ long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean
lookupAsync) {
+ this.cacheMaxSize = cacheMaxSize;
+ this.cacheExpireMs = cacheExpireMs;
+ this.maxRetryTimes = maxRetryTimes;
+ this.lookupAsync = lookupAsync;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public long getCacheMaxSize() {
+ return cacheMaxSize;
+ }
+
+ public long getCacheExpireMs() {
+ return cacheExpireMs;
+ }
+
+ public int getMaxRetryTimes() {
+ return maxRetryTimes;
+ }
+
+ public boolean getLookupAsync() {
+ return lookupAsync;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof RedisLookupOptions) {
+ RedisLookupOptions options = (RedisLookupOptions) o;
+ return Objects.equals(cacheMaxSize, options.cacheMaxSize)
+ && Objects.equals(cacheExpireMs, options.cacheExpireMs)
+ && Objects.equals(maxRetryTimes, options.maxRetryTimes)
+ && Objects.equals(lookupAsync, options.lookupAsync);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Builder of {@link RedisLookupOptions}.
+ */
+ public static class Builder {
+
+ private long cacheMaxSize = -1L;
+ private long cacheExpireMs = 0L;
+ private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
+ private boolean lookupAsync = false;
+
+ /**
+ * optional, lookup cache max size, over this value, the old data will
be eliminated.
+ */
+ public Builder setCacheMaxSize(long cacheMaxSize) {
+ this.cacheMaxSize = cacheMaxSize;
+ return this;
+ }
+
+ /**
+ * optional, lookup cache expire mills, over this time, the old data
will expire.
+ */
+ public Builder setCacheExpireMs(long cacheExpireMs) {
+ this.cacheExpireMs = cacheExpireMs;
+ return this;
+ }
+
+ /**
+ * optional, max retry times for Hbase connector.
+ */
+ public Builder setMaxRetryTimes(int maxRetryTimes) {
+ this.maxRetryTimes = maxRetryTimes;
+ return this;
+ }
+
+ /**
+ * optional, whether to set async lookup.
+ */
+ public Builder setLookupAsync(boolean lookupAsync) {
+ this.lookupAsync = lookupAsync;
+ return this;
+ }
+
+ public RedisLookupOptions build() {
+ return new RedisLookupOptions(cacheMaxSize, cacheExpireMs,
maxRetryTimes, lookupAsync);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisOptions.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisOptions.java
new file mode 100644
index 000000000..854aa0634
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisOptions.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator;
+
+import java.time.Duration;
+
+/**
+ * Redis options
+ */
+public class RedisOptions {
+
+ /**
+ * Redis mode, contains [cluster|sentinel|standalone]
+ */
+ public static final ConfigOption<String> REDIS_MODE = ConfigOptions
+ .key("redis-mode")
+ .stringType()
+ .defaultValue(InlongRedisValidator.REDIS_STANDALONE)
+ .withDescription("Optional redis-mode for connect to redis");
+ /**
+ * Redis database, used in [sentinel|standalone] redis-mode
+ */
+ public static final ConfigOption<Integer> DATABASE = ConfigOptions
+ .key("database")
+ .intType()
+ .defaultValue(0)
+ .withDescription("Optional database for connect to redis");
+ /**
+ * Redis command
+ */
+ public static final ConfigOption<String> COMMAND = ConfigOptions
+ .key("command")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional command for connect to redis");
+ /**
+ * Password used to connect redis
+ */
+ public static final ConfigOption<String> PASSWORD = ConfigOptions
+ .key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional password for connect to redis");
+ /**
+ * Cluster nodes used to connect redis with cluster mode
+ */
+ public static final ConfigOption<String> CLUSTER_NODES = ConfigOptions
+ .key(InlongRedisValidator.REDIS_NODES)
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional nodes for connect to redis cluster");
+ /**
+ * Whether to ignore delete events
+ */
+ public static final ConfigOption<Boolean> IGNORE_DELETE = ConfigOptions
+ .key("ignore.delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Ignore delete where receive Retraction");
+ /**
+ * Additional key used in Hash or Sorted-Set
+ */
+ public static final ConfigOption<String> ADDITIONAL_KEY = ConfigOptions
+ .key("additional.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional additional key for connect to redis");
+ /**
+ * Key ttl
+ */
+ public static final ConfigOption<Integer> KEY_TTL = ConfigOptions
+ .key("key.ttl")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Optional key ttl for connect to redis");
+ /**
+ * Timeout for connect to redis
+ */
+ public static final ConfigOption<Integer> TIMEOUT = ConfigOptions
+ .key("timeout")
+ .intType()
+ .defaultValue(2000)
+ .withDescription("Optional timeout for connect to redis");
+ /**
+ * Socket timeout for connect to redis
+ */
+ public static final ConfigOption<Integer> SOCKET_TIMEOUT = ConfigOptions
+ .key("soTimeout")
+ .intType()
+ .defaultValue(2000)
+ .withDescription("Optional soTimeout for redis");
+ /**
+ * Max total for connect to redis
+ */
+ public static final ConfigOption<Integer> MAX_TOTAL = ConfigOptions
+ .key("maxTotal")
+ .intType()
+ .defaultValue(2)
+ .withDescription("Optional maxTotal for connect to redis");
+ /**
+ * Max idle for connect to redis
+ */
+ public static final ConfigOption<Integer> MAXIDLE = ConfigOptions
+ .key("maxIdle")
+ .intType()
+ .defaultValue(2)
+ .withDescription("Optional maxIdle for connect to redis");
+ /**
+ * Min idle for connect to redis
+ */
+ public static final ConfigOption<Integer> MINIDLE = ConfigOptions
+ .key("minIdle")
+ .intType()
+ .defaultValue(1)
+ .withDescription("Optional minIdle for connect to redis");
+ /**
+ * Port for connect to redis used in standalone mode
+ */
+ public static final ConfigOption<Integer> PORT = ConfigOptions
+ .key("port")
+ .intType()
+ .defaultValue(6379)
+ .withDescription("Optional port for connect to redis");
+ /**
+ * Port for connect to redis used in standalone mode
+ */
+ public static final ConfigOption<String> HOST = ConfigOptions
+ .key("host")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional host for connect to redis");
+ /**
+ * Redis master name for connect to redis used in sentinel mode
+ */
+ public static final ConfigOption<String> REDIS_MASTER_NAME = ConfigOptions
+ .key("master.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional master.name for connect to redis
sentinels");
+ /**
+ * Sentinels info for connect to redis used in sentinel mode
+ */
+ public static final ConfigOption<String> SENTINELS_INFO = ConfigOptions
+ .key("sentinels.info")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional sentinels.info for connect to redis
sentinels");
+
+ /**
+ * Lookup cache max rows
+ */
+ public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+ ConfigOptions.key("lookup.cache.max-rows")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription(
+ "The max number of rows of lookup cache, over this
value, the oldest rows will "
+ + "be eliminated. \"cache.max-rows\" and
\"cache.ttl\" "
+ + "options must all be specified if any of
them is "
+ + "specified.");
+ /**
+ * Lookup cache ttl
+ */
+ public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+ ConfigOptions.key("lookup.cache.ttl")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription("The cache time to live.");
+ /**
+ * Lookup max retries
+ */
+ public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+ ConfigOptions.key("lookup.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("The max retry times if lookup database
failed.");
+ /**
+ * Lookup async
+ */
+ public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+ ConfigOptions.key("lookup.async")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("whether to set async lookup.");
+
+ private RedisOptions() {
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
new file mode 100644
index 000000000..09ad12763
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
+
+/**
+ * jedis cluster config handler to find and create jedis cluster config use
meta and
+ * copy from {@link
org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisClusterConfigHandler}
+ */
+public class FlinkJedisClusterConfigHandler implements
InlongJedisConfigHandler {
+
+ public FlinkJedisClusterConfigHandler() {
+ }
+
+ @Override
+ public FlinkJedisConfigBase createFlinkJedisConfig(ReadableConfig config) {
+ String nodesInfo = config.get(RedisOptions.CLUSTER_NODES);
+ Preconditions.checkNotNull(nodesInfo, "nodes should not be null");
+ Set<InetSocketAddress> nodes =
Arrays.stream(nodesInfo.split(",")).map(r -> {
+ String[] arr = r.split(":");
+ return new InetSocketAddress(arr[0].trim(),
Integer.parseInt(arr[1].trim()));
+ }).collect(Collectors.toSet());
+ FlinkJedisClusterConfig.Builder builder = new
FlinkJedisClusterConfig.Builder()
+
.setNodes(nodes).setPassword(config.get(RedisOptions.PASSWORD));
+ builder.setMaxIdle(config.get(RedisOptions.MAXIDLE))
+ .setMinIdle(config.get(RedisOptions.MINIDLE))
+ .setMaxTotal(config.get(RedisOptions.MAX_TOTAL))
+ .setTimeout(config.get(RedisOptions.TIMEOUT));
+ return builder.build();
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> require = new HashMap<>();
+ require.put(REDIS_MODE, REDIS_CLUSTER);
+ return require;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
new file mode 100644
index 000000000..c40d394ec
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_SENTINEL;
+
+/**
+ * Jedis sentinel config handler to find and create jedis Sentinel config use
meta and
+ * copy from {@link
org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisSentinelConfigHandler}
+ */
+public class FlinkJedisSentinelConfigHandler implements
InlongJedisConfigHandler {
+
+ public FlinkJedisSentinelConfigHandler() {
+
+ }
+
+ @Override
+ public FlinkJedisConfigBase createFlinkJedisConfig(ReadableConfig config) {
+ String masterName = config.get(RedisOptions.REDIS_MASTER_NAME);
+ String sentinelsInfo = config.get(RedisOptions.SENTINELS_INFO);
+ Objects.requireNonNull(masterName, "master should not be null in
sentinel mode");
+ Objects.requireNonNull(sentinelsInfo, "sentinels should not be null in
sentinel mode");
+ Set<String> sentinels = new
HashSet<>(Arrays.asList(sentinelsInfo.split(",")));
+ String sentinelsPassword = config.get(RedisOptions.PASSWORD);
+ return new FlinkJedisSentinelConfig.Builder()
+
.setMasterName(masterName).setSentinels(sentinels).setPassword(sentinelsPassword)
+ .setMaxIdle(config.get(RedisOptions.MAXIDLE))
+ .setMinIdle(config.get(RedisOptions.MINIDLE))
+ .setMaxTotal(config.get(RedisOptions.MAX_TOTAL))
+ .setDatabase(config.get(RedisOptions.DATABASE))
+ .setConnectionTimeout(config.get(RedisOptions.TIMEOUT))
+ .setSoTimeout(config.get(RedisOptions.SOCKET_TIMEOUT)).build();
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> require = new HashMap<>();
+ require.put(REDIS_MODE, REDIS_SENTINEL);
+ return require;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisStandaloneConfigHandler.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisStandaloneConfigHandler.java
new file mode 100644
index 000000000..4e39fdf58
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisStandaloneConfigHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+
+import java.util.HashMap;
+import java.util.Map;
+import static
org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator.REDIS_MODE;
+import static
org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator.REDIS_STANDALONE;
+
+/**
+ * Jedis standalone config handler to find and create jedis standalone config
use meta.
+ */
+public class FlinkJedisStandaloneConfigHandler implements
InlongJedisConfigHandler {
+
+ public FlinkJedisStandaloneConfigHandler() {
+
+ }
+
+ @Override
+ public FlinkJedisConfigBase createFlinkJedisConfig(ReadableConfig config) {
+ String host = config.get(RedisOptions.HOST);
+ Preconditions.checkNotNull(host, "host should not be null in
standalone mode");
+ int port = config.get(RedisOptions.PORT);
+ Preconditions.checkNotNull(port, "port should not be null in
standalone mode");
+ String password = config.get(RedisOptions.PASSWORD);
+ FlinkJedisPoolConfig.Builder builder = new
FlinkJedisPoolConfig.Builder()
+ .setHost(host)
+ .setPassword(password)
+ .setPort(port)
+ .setMaxIdle(config.get(RedisOptions.MAXIDLE))
+ .setMinIdle(config.get(RedisOptions.MINIDLE))
+ .setMaxTotal(config.get(RedisOptions.MAX_TOTAL))
+ .setDatabase(config.get(RedisOptions.DATABASE))
+ .setTimeout(config.get(RedisOptions.TIMEOUT));
+ return builder.build();
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> require = new HashMap<>();
+ require.put(REDIS_MODE, REDIS_STANDALONE);
+ return require;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisClusterContainer.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisClusterContainer.java
new file mode 100644
index 000000000..b2b8d957f
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisClusterContainer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.container;
+
+import
org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+/**
+ * The redis cluster contain expand from {@link RedisClusterContainer}
+ */
+public class InlongRedisClusterContainer extends RedisClusterContainer
implements InlongRedisCommandsContainer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(InlongRedisClusterContainer.class);
+
+ private final transient JedisCluster jedisCluster;
+
+ public InlongRedisClusterContainer(JedisCluster jedisCluster) {
+ super(jedisCluster);
+ this.jedisCluster = jedisCluster;
+ }
+
+ @Override
+ public void del(String key) {
+ try {
+ jedisCluster.del(key);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command del to key
{} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void hdel(String key, String hashField) {
+ try {
+ jedisCluster.hdel(key, hashField);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command hdel to key
{} of field {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public String get(String key) {
+ try {
+ return jedisCluster.get(key);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot get value with get command from key {} error
message {}", key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public String hget(String key, String hashField) {
+ try {
+ return jedisCluster.hget(key, hashField);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot get value with hget command from key {} of
field {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public Double zscore(String key, String member) {
+ try {
+ return jedisCluster.zscore(key, member);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot get value with zscore command from key {} of
member {} error message {}",
+ key, member, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public Long zrevrank(String key, String member) {
+ try {
+ return jedisCluster.zrevrank(key, member);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot get value with zrevrank command from key {}
of member {} error message {}",
+ key, member, e.getMessage());
+ }
+ throw e;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisCommandsContainer.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisCommandsContainer.java
new file mode 100644
index 000000000..a7d675810
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisCommandsContainer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.container;
+
+import
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+
+/**
+ * The container interface expand from {@link RedisCommandsContainer}
+ */
+public interface InlongRedisCommandsContainer extends RedisCommandsContainer {
+
+ /**
+ * Delete value from specified key.
+ *
+ * @param key the key to be delete
+ */
+ void del(String key);
+
+ /**
+ * Delete field in the hash stored .
+ *
+ * @param key Hash name
+ * @param hashField Hash field
+ */
+ void hdel(String key, String hashField);
+
+ /**
+ * Get value from specified key
+ *
+ * @param key The specified key
+ * @return The value of specified key
+ */
+ String get(String key);
+
+ /**
+ * Get value from specified key with hashField
+ *
+ * @param key The specified key
+ * @param hashField The hash field
+ * @return The value of specified key
+ */
+ String hget(String key, String hashField);
+
+ /**
+ * Get value from specified key with hashField
+ *
+ * @param key The specified key
+ * @param member The member of sorted-set
+ * @return The value of specified key
+ */
+ Double zscore(String key, String member);
+
+ /**
+ * Get value from specified key with member
+ *
+ * @param key The specified key
+ * @param member The member of sorted-set
+ * @return The value of specified key with member
+ */
+ Long zrevrank(String key, String member);
+
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisContainer.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisContainer.java
new file mode 100644
index 000000000..e0902e607
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisContainer.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.container;
+
+import
org.apache.flink.streaming.connectors.redis.common.container.RedisContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The redis contain expand from {@link RedisContainer}
+ */
+public class InlongRedisContainer extends RedisContainer implements
InlongRedisCommandsContainer {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(InlongRedisContainer.class);
+ private final transient JedisPool jedisPool;
+ private final transient JedisSentinelPool jedisSentinelPool;
+
+ /**
+ * Use this constructor if to connect with single Redis server.
+ *
+ * @param jedisPool JedisPool which actually manages Jedis instances
+ */
+ public InlongRedisContainer(JedisPool jedisPool) {
+ super(jedisPool);
+ this.jedisPool = jedisPool;
+ this.jedisSentinelPool = null;
+ }
+
+ /**
+ * Use this constructor if Redis environment is clustered with sentinels.
+ *
+ * @param sentinelPool SentinelPool which actually manages Jedis instances
+ */
+ public InlongRedisContainer(JedisSentinelPool sentinelPool) {
+ super(sentinelPool);
+ this.jedisPool = null;
+ this.jedisSentinelPool = sentinelPool;
+ }
+
+ @Override
+ public void del(String key) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.del(key);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis with del command to key {} error
message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void hdel(String key, String hashField) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.hdel(key, hashField);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis with hdel command to key {} of
field {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public String get(String key) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ return jedis.get(key);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot get value with get command from key {} error
message {}", key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public String hget(String key, String hashField) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ return jedis.hget(key, hashField);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot get value with hget command from key {} of
field {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public Double zscore(String key, String member) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ return jedis.zscore(key, member);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot get value with zscore command from key {} of
member {} error message {}",
+ key, member, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public Long zrevrank(String key, String member) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ return jedis.zrevrank(key, member);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot get value with zrevrank command from key {}
of member {} error message {}",
+ key, member, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ public Jedis getInstance() {
+ if (jedisSentinelPool != null) {
+ return jedisSentinelPool.getResource();
+ } else {
+ return jedisPool.getResource();
+ }
+ }
+
+ /**
+ * Closes the jedis instance after finishing the command.
+ *
+ * @param jedis The jedis instance
+ */
+ public void releaseInstance(final Jedis jedis) {
+ if (jedis == null) {
+ return;
+ }
+ try {
+ jedis.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close (return) instance to pool", e);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/RedisCommandsContainerBuilder.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/RedisCommandsContainerBuilder.java
new file mode 100644
index 000000000..4ee8ecfc7
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/RedisCommandsContainerBuilder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Objects;
+
+/**
+ * The redis command container builder and
+ * copy from {@link
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder}
+ */
+public class RedisCommandsContainerBuilder {
+
+ /**
+ * Initialize the {@link InlongRedisCommandsContainer} based on the
instance type.
+ *
+ * @param flinkJedisConfigBase configuration base
+ * @return @throws IllegalArgumentException if jedisPoolConfig,
jedisClusterConfig and jedisSentinelConfig are all
+ * null
+ */
+ public static InlongRedisCommandsContainer build(FlinkJedisConfigBase
flinkJedisConfigBase) {
+ if (flinkJedisConfigBase instanceof FlinkJedisPoolConfig) {
+ FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig)
flinkJedisConfigBase;
+ return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
+ } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
+ FlinkJedisClusterConfig flinkJedisClusterConfig =
(FlinkJedisClusterConfig) flinkJedisConfigBase;
+ return
RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
+ } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
+ FlinkJedisSentinelConfig flinkJedisSentinelConfig =
(FlinkJedisSentinelConfig) flinkJedisConfigBase;
+ return
RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
+ } else {
+ throw new IllegalArgumentException("Jedis configuration not
found");
+ }
+ }
+
+ /**
+ * Builds container for single Redis environment.
+ *
+ * @param jedisPoolConfig configuration for JedisPool
+ * @return container for single Redis environment
+ * @throws NullPointerException if jedisPoolConfig is null
+ */
+ public static InlongRedisCommandsContainer build(FlinkJedisPoolConfig
jedisPoolConfig) {
+ Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not
be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig =
getGenericObjectPoolConfig(jedisPoolConfig);
+
+ JedisPool jedisPool = new JedisPool(genericObjectPoolConfig,
jedisPoolConfig.getHost(),
+ jedisPoolConfig.getPort(),
jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase());
+ return new InlongRedisContainer(jedisPool);
+ }
+
+ /**
+ * Builds container for Redis Cluster environment.
+ *
+ * @param jedisClusterConfig configuration for JedisCluster
+ * @return container for Redis Cluster environment
+ * @throws NullPointerException if jedisClusterConfig is null
+ */
+ public static InlongRedisCommandsContainer build(FlinkJedisClusterConfig
jedisClusterConfig) {
+ Objects.requireNonNull(jedisClusterConfig, "Redis cluster config
should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig =
getGenericObjectPoolConfig(jedisClusterConfig);
+
+ JedisCluster jedisCluster = new
JedisCluster(jedisClusterConfig.getNodes(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getMaxRedirections(),
+ jedisClusterConfig.getPassword(),
+ genericObjectPoolConfig);
+ return new InlongRedisClusterContainer(jedisCluster);
+ }
+
+ /**
+ * Builds container for Redis Sentinel environment.
+ *
+ * @param jedisSentinelConfig configuration for JedisSentinel
+ * @return container for Redis sentinel environment
+ * @throws NullPointerException if jedisSentinelConfig is null
+ */
+ public static InlongRedisCommandsContainer build(FlinkJedisSentinelConfig
jedisSentinelConfig) {
+ Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config
should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig =
getGenericObjectPoolConfig(jedisSentinelConfig);
+
+ JedisSentinelPool jedisSentinelPool = new
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+ jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+ jedisSentinelConfig.getConnectionTimeout(),
jedisSentinelConfig.getSoTimeout(),
+ jedisSentinelConfig.getPassword(),
jedisSentinelConfig.getDatabase());
+ return new InlongRedisContainer(jedisSentinelPool);
+ }
+
+ public static GenericObjectPoolConfig
getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+ GenericObjectPoolConfig genericObjectPoolConfig =
+ jedisConfig.getTestWhileIdle() ? new JedisPoolConfig() : new
GenericObjectPoolConfig();
+ genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
+ genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
+ genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
+ genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow());
+ genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn());
+
+ return genericObjectPoolConfig;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/descriptor/InlongRedisValidator.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/descriptor/InlongRedisValidator.java
new file mode 100644
index 000000000..d17b1d01a
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/descriptor/InlongRedisValidator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.descriptor;
+
+import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
+
+/**
+ * InLong redis validator expand from {@link RedisValidator}
+ */
+public class InlongRedisValidator extends RedisValidator {
+
+ /**
+ * The standalone of redis deploy mode
+ */
+ public static final String REDIS_STANDALONE = "standalone";
+ /**
+ * Redis additional key used in hash or sorted-set
+ */
+ public static final String REDIS_ADDITIONAL_KEY = "additional.key";
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java
new file mode 100644
index 000000000..6430540fe
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler;
+
+/**
+ * handler to create flink jedis config.
+ */
+public interface InlongJedisConfigHandler extends RedisHandler {
+
+ /**
+ * create flink jedis config use sepecified properties.
+ *
+ * @param config The config
+ * @return flink jedis config
+ */
+ FlinkJedisConfigBase createFlinkJedisConfig(ReadableConfig config);
+
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/RedisMapperHandler.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/RedisMapperHandler.java
new file mode 100644
index 000000000..0f3236e90
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/RedisMapperHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.handler;
+
+import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler;
+import org.apache.inlong.sort.redis.common.mapper.RedisMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
+import static
org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator.REDIS_ADDITIONAL_KEY;
+
+/**
+ * Handler for create redis mapper.
+ * Copy from {@link
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler}
+ */
+public interface RedisMapperHandler extends RedisHandler {
+
+ Logger LOGGER = LoggerFactory.getLogger(RedisMapperHandler.class);
+
+ /**
+ * create a correct redis mapper use properties.
+ *
+ * @param properties to create redis mapper.
+ * @return redis mapper.
+ */
+ default RedisMapper createRedisMapper(Map<String, String> properties) {
+ String ttl = properties.get(REDIS_KEY_TTL);
+ String additionalKey = properties.get(REDIS_ADDITIONAL_KEY);
+ try {
+ Class redisMapper =
Class.forName(this.getClass().getCanonicalName());
+ if (ttl == null && additionalKey == null) {
+ return (RedisMapper) redisMapper.newInstance();
+ }
+ if (additionalKey != null && ttl != null) {
+ return (RedisMapper) redisMapper.getConstructor(Integer.class,
String.class)
+ .newInstance(ttl, additionalKey);
+ }
+ if (additionalKey != null) {
+ return (RedisMapper) redisMapper.getConstructor(String.class)
+ .newInstance(additionalKey);
+ }
+ return (RedisMapper) redisMapper.getConstructor(Integer.class)
+ .newInstance(ttl);
+ } catch (Exception e) {
+ LOGGER.error("create redis mapper failed", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommand.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommand.java
new file mode 100644
index 000000000..871a3a911
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommand.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link
RedisDataType} group.
+ * Copy from {@link
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand}
+ */
+public enum RedisCommand {
+
+ /**
+ * Insert the specified value at the head of the list stored at key.
+ * If key does not exist, it is created as empty list before performing
the push operations.
+ */
+ LPUSH(RedisDataType.LIST),
+
+ /**
+ * Insert the specified value at the tail of the list stored at key.
+ * If key does not exist, it is created as empty list before performing
the push operation.
+ */
+ RPUSH(RedisDataType.LIST),
+
+ /**
+ * Add the specified member to the set stored at key.
+ * Specified member that is already a member of this set is ignored.
+ */
+ SADD(RedisDataType.SET),
+
+ /**
+ * Set key to hold the string value. If key already holds a value,
+ * it is overwritten, regardless of its type.
+ */
+ SET(RedisDataType.STRING),
+
+ /**
+ * Set key to hold the string value, with a time to live (TTL). If key
already holds a value,
+ * it is overwritten, regardless of its type.
+ */
+ SETEX(RedisDataType.STRING),
+
+ /**
+ * Adds the element to the HyperLogLog data structure stored at the
variable name specified as first argument.
+ */
+ PFADD(RedisDataType.HYPER_LOG_LOG),
+
+ /**
+ * Posts a message to the given channel.
+ */
+ PUBLISH(RedisDataType.PUBSUB),
+
+ /**
+ * Adds the specified members with the specified score to the sorted set
stored at key.
+ */
+ ZADD(RedisDataType.SORTED_SET),
+
+ ZINCRBY(RedisDataType.SORTED_SET),
+
+ /**
+ * Removes the specified members from the sorted set stored at key.
+ */
+ ZREM(RedisDataType.SORTED_SET),
+
+ /**
+ * Sets field in the hash stored at key to value. If key does not exist,
+ * a new key holding a hash is created. If field already exists in the
hash, it is overwritten.
+ */
+ HSET(RedisDataType.HASH),
+
+ HINCRBY(RedisDataType.HINCRBY),
+
+ /**
+ * Delta plus for specified key.
+ */
+ INCRBY(RedisDataType.STRING),
+
+ /**
+ * Delta plus for specified key and expire the key with fixed time.
+ */
+ INCRBY_EX(RedisDataType.STRING),
+
+ /**
+ * decrease with fixed num for specified key.
+ */
+ DECRBY(RedisDataType.STRING),
+
+ /**
+ * decrease with fixed num for specified key and expire the key with fixed
time.
+ */
+ DESCRBY_EX(RedisDataType.STRING),
+
+ /**
+ * Get value for specified key
+ */
+ GET(RedisDataType.STRING),
+ /**
+ * Get value for specified key with hash field
+ */
+ HGET(RedisDataType.HASH),
+ /**
+ * Get rank number for specified key with member
+ */
+ ZREVRANK(RedisDataType.SORTED_SET),
+ /**
+ * Get score for specified key with member
+ */
+ ZSCORE(RedisDataType.SORTED_SET);
+
+ /**
+ * The {@link RedisDataType} this command belongs to.
+ */
+ private RedisDataType redisDataType;
+
+ RedisCommand(RedisDataType redisDataType) {
+ this.redisDataType = redisDataType;
+ }
+
+ /**
+ * The {@link RedisDataType} this command belongs to.
+ *
+ * @return the {@link RedisDataType}
+ */
+ public RedisDataType getRedisDataType() {
+ return redisDataType;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommandDescription.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommandDescription.java
new file mode 100644
index 000000000..9e86e6a26
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommandDescription.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The description of the command type.
+ * Copy from {@link
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription}
+ */
+public class RedisCommandDescription implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private RedisCommand redisCommand;
+
+ /**
+ * This additional key is needed for the group {@link RedisDataType#HASH}
and {@link RedisDataType#SORTED_SET}.
+ * Other {@link RedisDataType} works only with two variable i.e. name of
the list and value to be added.
+ * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
we need three variables.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and
element. Its possible to use TTL.
+ * {@link #getAdditionalKey()} used as hash name for {@link
RedisDataType#HASH}
+ * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element
and it's score.
+ * {@link #getAdditionalKey()} used as set name for {@link
RedisDataType#SORTED_SET}
+ */
+ private String additionalKey;
+
+ /**
+ * This additional key is optional for the group {@link
RedisDataType#HASH}, required for {@link
+ * RedisCommand#SETEX}.
+ * For the other types and commands, its not used.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and
element. Its possible to use TTL.
+ * {@link #getAdditionalTTL()} used as time to live (TTL) for {@link
RedisDataType#HASH}
+ * <p>For {@link RedisCommand#SETEX}, we need key, value and time to live
(TTL).
+ */
+ private Integer additionalTTL;
+
+ /**
+ * Default constructor for {@link RedisCommandDescription}.
+ * For {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
data types, {@code additionalKey} is
+ * required.
+ * For {@link RedisCommand#SETEX} command, {@code additionalTTL} is
required.
+ * In both cases, if the respective variables are not provided, it throws
an {@link IllegalArgumentException}
+ *
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalKey additional key for Hash data type
+ * @param additionalTTL additional TTL optional for Hash data type
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, String
additionalKey, Integer additionalTTL) {
+ Objects.requireNonNull(redisCommand, "Redis command type can not be
null");
+ this.redisCommand = redisCommand;
+ this.additionalKey = additionalKey;
+ this.additionalTTL = additionalTTL;
+
+ if (redisCommand.getRedisDataType() == RedisDataType.HASH
+ || redisCommand.getRedisDataType() ==
RedisDataType.SORTED_SET) {
+ if (additionalKey == null) {
+ throw new IllegalArgumentException("Hash and Sorted-Set should
have additional key");
+ }
+ }
+
+ if (redisCommand.equals(RedisCommand.SETEX)) {
+ if (additionalTTL == null) {
+ throw new IllegalArgumentException("SETEX command should have
time to live (TTL)");
+ }
+ }
+
+ if (redisCommand.equals(RedisCommand.INCRBY_EX)) {
+ if (additionalTTL == null) {
+ throw new IllegalArgumentException("INCRBY_EX command should
have time to live (TTL)");
+ }
+ }
+
+ if (redisCommand.equals(RedisCommand.DESCRBY_EX)) {
+ if (additionalTTL == null) {
+ throw new IllegalArgumentException("INCRBY_EX command should
have time to live (TTL)");
+ }
+ }
+ }
+
+ /**
+ * Use this constructor when data type is {@link RedisDataType#HASH}
(without TTL) or {@link
+ * RedisDataType#SORTED_SET}.
+ * If different data type is specified, {@code additionalKey} is ignored.
+ *
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalKey additional key for Hash and Sorted set data type
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, String
additionalKey) {
+ this(redisCommand, additionalKey, null);
+ }
+
+ /**
+ * Use this constructor when using SETEX command {@link
RedisDataType#STRING}.
+ * This command requires a TTL. Throws {@link IllegalArgumentException} if
it is null.
+ *
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalTTL additional TTL required for SETEX command
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, Integer
additionalTTL) {
+ this(redisCommand, null, additionalTTL);
+ }
+
+ /**
+ * Use this constructor when command type is not in group {@link
RedisDataType#HASH} or {@link
+ * RedisDataType#SORTED_SET}.
+ *
+ * @param redisCommand the redis data type {@link RedisCommand}
+ */
+ public RedisCommandDescription(RedisCommand redisCommand) {
+ this(redisCommand, null, null);
+ }
+
+ /**
+ * Returns the {@link RedisCommand}.
+ *
+ * @return the command type of the mapping
+ */
+ public RedisCommand getCommand() {
+ return redisCommand;
+ }
+
+ /**
+ * Returns the additional key if data type is {@link RedisDataType#HASH}
and {@link RedisDataType#SORTED_SET}.
+ *
+ * @return the additional key
+ */
+ public String getAdditionalKey() {
+ return additionalKey;
+ }
+
+ /**
+ * Returns the additional time to live (TTL) if data type is {@link
RedisDataType#HASH}.
+ *
+ * @return the additional TTL
+ */
+ public Integer getAdditionalTTL() {
+ return additionalTTL;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisMapper.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisMapper.java
new file mode 100644
index 000000000..6436b7cbf
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Function that creates the description how the input data should be mapped
to redis type.
+ * Copy from {@link
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper}
+ */
+public interface RedisMapper<T> extends Function, Serializable {
+
+ /**
+ * Returns descriptor which defines data type.
+ *
+ * @return data type descriptor
+ */
+ RedisCommandDescription getCommandDescription();
+
+ /**
+ * Extracts key from data.
+ *
+ * @param data source data
+ * @return key
+ */
+ default String getKeyFromData(T data) {
+ return null;
+ }
+
+ /**
+ * Extracts value from data.
+ *
+ * @param data source data
+ * @return value
+ */
+ default String getValueFromData(T data) {
+ return null;
+ }
+
+ /**
+ * Extracts the additional key from data as an {@link Optional <String/>}.
+ * The default implementation returns an empty Optional.
+ *
+ * @param data
+ * @return Optional
+ */
+ default Optional<String> getAdditionalKey(T data) {
+ return Optional.empty();
+ }
+
+ /**
+ * Extracts the additional time to live (TTL) for data.
+ * The default implementation returns an empty Optional.
+ *
+ * @param data
+ * @return Optional
+ */
+ default Optional<Integer> getAdditionalTTL(T data) {
+ return Optional.empty();
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/GetMapper.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/GetMapper.java
new file mode 100644
index 000000000..f8fc1cffa
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/GetMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+/**
+ * Get mapper that used to get value from a specified key
+ */
+public class GetMapper extends RowRedisMapper {
+
+ private static final long serialVersionUID = 1L;
+
+ public GetMapper() {
+ super(RedisCommand.GET);
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/HgetMapper.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/HgetMapper.java
new file mode 100644
index 000000000..0e61f0543
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/HgetMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+/**
+ * Hget mapper that used to get value from a specified key with hash field
+ */
+public class HgetMapper extends RowRedisMapper {
+
+ public HgetMapper() {
+ super(RedisCommand.HGET);
+ }
+
+ public HgetMapper(String additionalKey) {
+ super(additionalKey, RedisCommand.HGET);
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/RowRedisMapper.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/RowRedisMapper.java
new file mode 100644
index 000000000..a2838cce6
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/RowRedisMapper.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.redis.common.handler.RedisMapperHandler;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommandDescription;
+import org.apache.inlong.sort.redis.common.mapper.RedisMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
+
+/**
+ * Base row redis mapper implement.
+ * Copy from {@link
org.apache.flink.streaming.connectors.redis.common.mapper.row.RowRedisMapper}
+ */
+public abstract class RowRedisMapper implements RedisMapper<RowData>,
RedisMapperHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RowRedisMapper.class);
+
+ private Integer ttl;
+
+ private RedisCommand redisCommand;
+
+ private String additionalKey;
+
+ public RowRedisMapper() {
+ }
+
+ public RowRedisMapper(Integer ttl, RedisCommand redisCommand) {
+ this(ttl, null, redisCommand);
+ }
+
+ public RowRedisMapper(Integer ttl, String additionalKey, RedisCommand
redisCommand) {
+ this.ttl = ttl;
+ this.additionalKey = additionalKey;
+ this.redisCommand = redisCommand;
+ }
+
+ public RowRedisMapper(String additionalKey, RedisCommand redisCommand) {
+ this(null, additionalKey, redisCommand);
+ }
+
+ public RowRedisMapper(RedisCommand redisCommand) {
+ this.redisCommand = redisCommand;
+ }
+
+ public Integer getTtl() {
+ return ttl;
+ }
+
+ public void setTtl(int ttl) {
+ this.ttl = ttl;
+ }
+
+ public RedisCommand getRedisCommand() {
+ return redisCommand;
+ }
+
+ public void setRedisCommand(RedisCommand redisCommand) {
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ return new RedisCommandDescription(redisCommand, additionalKey, ttl);
+ }
+
+ @Override
+ public String getKeyFromData(RowData data) {
+ return data.getString(0).toString();
+ }
+
+ @Override
+ public String getValueFromData(RowData data) {
+ return data.getString(1).toString();
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> require = new HashMap<>();
+ require.put(REDIS_COMMAND,
getRedisCommand().name().toLowerCase(Locale.ROOT));
+ return require;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ RedisCommand redisCommand = ((RowRedisMapper) obj).redisCommand;
+ return this.redisCommand == redisCommand;
+ }
+
+ @Override
+ public Optional<Integer> getAdditionalTTL(RowData data) {
+ return Optional.ofNullable(getTtl());
+ }
+
+ @Override
+ public Optional<String> getAdditionalKey(RowData data) {
+ return Optional.ofNullable(additionalKey);
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZrevrankMapper.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZrevrankMapper.java
new file mode 100644
index 000000000..62322635e
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZrevrankMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+/**
+ * Zrevrank mapper that used to get rank number from a specified key with
member
+ */
+public class ZrevrankMapper extends RowRedisMapper {
+
+ public ZrevrankMapper(String additionalKey) {
+ super(additionalKey, RedisCommand.ZREVRANK);
+ }
+
+ public ZrevrankMapper() {
+ super(RedisCommand.ZREVRANK);
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZscoreMapper.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZscoreMapper.java
new file mode 100644
index 000000000..0dd7a220b
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZscoreMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+/**
+ * Zscore mapper that used to get score from a specified key with member
+ */
+public class ZscoreMapper extends RowRedisMapper {
+
+ public ZscoreMapper(String additionalKey) {
+ super(additionalKey, RedisCommand.ZSCORE);
+ }
+
+ public ZscoreMapper() {
+ super(RedisCommand.ZSCORE);
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
new file mode 100644
index 000000000..30b6766e4
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.source;
+
+import org.apache.flink.configuration.ReadableConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+import org.apache.inlong.sort.redis.common.handler.RedisMapperHandler;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+import org.apache.inlong.sort.redis.common.mapper.RedisMapper;
+import org.apache.inlong.sort.redis.table.SchemaValidator;
+
+import java.util.Map;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+
+/**
+ * Redis dynamic table source
+ */
+public class RedisDynamicTableSource implements LookupTableSource {
+
+ private final FlinkJedisConfigBase flinkJedisConfigBase;
+
+ private final RedisMapper redisMapper;
+ private final ResolvedSchema tableSchema;
+ private final ReadableConfig config;
+ private final RedisLookupOptions redisLookupOptions;
+ private final Map<String, String> properties;
+
+ public RedisDynamicTableSource(Map<String, String> properties,
ResolvedSchema tableSchema,
+ ReadableConfig config, RedisLookupOptions redisLookupOptions) {
+ this.properties = properties;
+ Preconditions.checkNotNull(properties, "properties should not be
null");
+ this.tableSchema = tableSchema;
+ Preconditions.checkNotNull(tableSchema, "tableSchema should not be
null");
+ this.config = config;
+ properties.putIfAbsent(RedisOptions.REDIS_MODE.key(),
config.get(RedisOptions.REDIS_MODE));
+ redisMapper = RedisHandlerServices
+ .findRedisHandler(RedisMapperHandler.class, properties)
+ .createRedisMapper(properties);
+ RedisCommand command =
redisMapper.getCommandDescription().getCommand();
+ new SchemaValidator()
+ .register(RedisCommand.GET, new LogicalTypeRoot[]{VARCHAR,
VARCHAR})
+ .register(RedisCommand.HGET, new LogicalTypeRoot[]{VARCHAR,
VARCHAR})
+ .register(RedisCommand.ZSCORE, new LogicalTypeRoot[]{VARCHAR,
DOUBLE})
+ .register(RedisCommand.ZREVRANK, new
LogicalTypeRoot[]{VARCHAR, BIGINT})
+ .validate(command, tableSchema);
+ flinkJedisConfigBase = RedisHandlerServices
+ .findRedisHandler(InlongJedisConfigHandler.class,
properties).createFlinkJedisConfig(config);
+ this.redisLookupOptions = redisLookupOptions;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new RedisDynamicTableSource(properties, tableSchema, config,
redisLookupOptions);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "REDIS";
+ }
+
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
+ return TableFunctionProvider.of(new RedisRowDataLookupFunction(
+ redisMapper.getCommandDescription(), flinkJedisConfigBase,
this.redisLookupOptions));
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
new file mode 100644
index 000000000..c71a38792
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.source;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
+import
org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer;
+import
org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommandDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Redis RowData lookup function
+ */
+public class RedisRowDataLookupFunction extends TableFunction<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RedisRowDataLookupFunction.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+ private final FlinkJedisConfigBase flinkJedisConfigBase;
+ private final String additionalKey;
+ private final RedisCommand redisCommand;
+ private transient Cache<RowData, RowData> cache;
+ private InlongRedisCommandsContainer redisCommandsContainer;
+
+ RedisRowDataLookupFunction(RedisCommandDescription redisCommandDescription,
+ FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions
redisLookupOptions) {
+ this.flinkJedisConfigBase = flinkJedisConfigBase;
+ this.redisCommand = redisCommandDescription.getCommand();
+ this.additionalKey = redisCommandDescription.getAdditionalKey();
+ this.cacheMaxSize = redisLookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = redisLookupOptions.getCacheExpireMs();
+ this.maxRetryTimes = redisLookupOptions.getMaxRetryTimes();
+ }
+
+ /**
+ * This is a lookup method which is called by Flink framework in runtime,
only support one key
+ *
+ * @param keys lookup keys
+ */
+ public void eval(Object... keys) {
+ RowData keyRow = GenericRowData.of(keys);
+ if (cache != null) {
+ RowData cachedRow = cache.getIfPresent(keyRow);
+ if (cachedRow != null) {
+ collect(cachedRow);
+ return;
+ }
+ }
+ for (int retry = 0; retry <= maxRetryTimes; retry++) {
+ try {
+ RowData rowData;
+ switch (redisCommand) {
+ case GET:
+ rowData = GenericRowData
+ .of(StringData.fromString(keys[0].toString()),
StringData
+
.fromString(this.redisCommandsContainer.get(keys[0].toString())));
+ break;
+ case HGET:
+ rowData = GenericRowData
+ .of(StringData.fromString(keys[0].toString()),
StringData.fromString(
+
this.redisCommandsContainer.hget(this.additionalKey, keys[0].toString())));
+ break;
+ case ZREVRANK:
+ rowData = GenericRowData
+ .of(StringData.fromString(keys[0].toString()),
+
this.redisCommandsContainer.zrevrank(this.additionalKey, keys[0].toString()));
+ break;
+ case ZSCORE:
+ rowData = GenericRowData
+ .of(StringData.fromString(keys[0].toString()),
+
this.redisCommandsContainer.zscore(this.additionalKey, keys[0].toString()));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported for redisCommand:
%s", redisCommand));
+ }
+ if (cache == null) {
+ collect(rowData);
+ } else {
+ collect(rowData);
+ cache.put(keyRow, rowData);
+ }
+ break;
+ } catch (Exception e) {
+ LOG.error(String.format("Redis query error, retry times = %d",
retry), e);
+ if (retry >= maxRetryTimes) {
+ throw new RuntimeException("Redis query error failed.", e);
+ }
+ try {
+ Thread.sleep(1000 * retry);
+ } catch (InterruptedException e1) {
+ throw new RuntimeException(e1);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ try {
+ this.redisCommandsContainer =
RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+ this.redisCommandsContainer.open();
+ this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null :
CacheBuilder.newBuilder()
+ .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+ .maximumSize(cacheMaxSize)
+ .build();
+ } catch (Exception e) {
+ LOG.error("Redis has not been properly initialized: ", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (redisCommandsContainer != null) {
+ redisCommandsContainer.close();
+ }
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
new file mode 100644
index 000000000..46d8bf435
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+import org.apache.inlong.sort.redis.source.RedisDynamicTableSource;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static org.apache.flink.util.Preconditions.checkState;
+import static
org.apache.inlong.sort.redis.common.config.RedisOptions.LOOKUP_ASYNC;
+import static
org.apache.inlong.sort.redis.common.config.RedisOptions.LOOKUP_CACHE_MAX_ROWS;
+import static
org.apache.inlong.sort.redis.common.config.RedisOptions.LOOKUP_CACHE_TTL;
+import static
org.apache.inlong.sort.redis.common.config.RedisOptions.LOOKUP_MAX_RETRIES;
+
+/**
+ * Redis dynamic table factory
+ */
+public class RedisDynamicTableFactory implements DynamicTableSourceFactory {
+
+ /**
+ * The identifier of Redis Connector
+ */
+ public static final String IDENTIFIER = "redis-inlong";
+ /**
+ * Supported redis mode, contains [standalone|cluster|sentinel].
+ */
+ public static final Set<String> SUPPORT_REDIS_MODE = new HashSet<String>()
{
+ private static final long serialVersionUID = 1L;
+
+ {
+ add(RedisValidator.REDIS_CLUSTER);
+ add(RedisValidator.REDIS_SENTINEL);
+ add(InlongRedisValidator.REDIS_STANDALONE);
+ }
+ };
+ /**
+ * Supported redis source commands, contain [GET|HGET|ZREVRANK|ZSCORE] at
now.
+ */
+ public static Set<String> SUPPORT_SOURCE_COMMANDS = new HashSet<String>() {
+ private static final long serialVersionUID = 1L;
+
+ {
+ add(RedisCommand.GET.name());
+ add(RedisCommand.HGET.name());
+ add(RedisCommand.ZREVRANK.name());
+ add(RedisCommand.ZSCORE.name());
+ }
+ };
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ ReadableConfig config = helper.getOptions();
+ helper.validate();
+ validateConfigOptions(config, SUPPORT_SOURCE_COMMANDS);
+ return new
RedisDynamicTableSource(context.getCatalogTable().getOptions(),
+ context.getCatalogTable().getResolvedSchema(), config,
getJdbcLookupOptions(config));
+ }
+
+ private RedisLookupOptions getJdbcLookupOptions(ReadableConfig
readableConfig) {
+ return new
RedisLookupOptions(readableConfig.get(LOOKUP_CACHE_MAX_ROWS),
+ readableConfig.get(LOOKUP_CACHE_TTL).toMillis(),
+ readableConfig.get(LOOKUP_MAX_RETRIES),
readableConfig.get(LOOKUP_ASYNC));
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+ requiredOptions.add(RedisOptions.COMMAND);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(RedisOptions.CLUSTER_NODES);
+ options.add(RedisOptions.DATABASE);
+ options.add(RedisOptions.PASSWORD);
+ options.add(RedisOptions.REDIS_MODE);
+ options.add(RedisOptions.ADDITIONAL_KEY);
+ options.add(RedisOptions.MAXIDLE);
+ options.add(RedisOptions.MINIDLE);
+ options.add(RedisOptions.REDIS_MASTER_NAME);
+ options.add(LOOKUP_ASYNC);
+ options.add(LOOKUP_CACHE_MAX_ROWS);
+ options.add(LOOKUP_CACHE_TTL);
+ options.add(LOOKUP_MAX_RETRIES);
+ options.add(RedisOptions.HOST);
+ options.add(RedisOptions.MAX_TOTAL);
+ options.add(RedisOptions.PORT);
+ options.add(RedisOptions.SENTINELS_INFO);
+ options.add(RedisOptions.SOCKET_TIMEOUT);
+ options.add(RedisOptions.TIMEOUT);
+ return options;
+ }
+
+ private void validateConfigOptions(ReadableConfig config, Set<String>
supportCommands) {
+ String redisMode = config.get(RedisOptions.REDIS_MODE);
+ List<String> matchRedisMode = SUPPORT_REDIS_MODE.stream().filter(e ->
e.equals(redisMode.toLowerCase().trim()))
+ .collect(Collectors.toList());
+ checkState(!matchRedisMode.isEmpty(),
+ "Unsupported redis-mode " + redisMode + ". The supported
redis-mode " + Arrays
+ .deepToString(SUPPORT_REDIS_MODE.toArray()));
+ String command = config.get(RedisOptions.COMMAND);
+ Preconditions.checkState(!StringUtils.isNullOrWhitespaceOnly(command),
+ "Command can not be empty. The supported command are " + Arrays
+ .deepToString(supportCommands.toArray()));
+ List<String> matchCommand = supportCommands
+ .stream().filter(e ->
e.equals(command.toUpperCase().trim())).collect(Collectors.toList());
+ checkState(!matchCommand.isEmpty(), "Unsupported command " + command +
". The supported command " + Arrays
+ .deepToString(supportCommands.toArray()));
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/SchemaValidator.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/SchemaValidator.java
new file mode 100644
index 000000000..3377e3202
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/SchemaValidator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.table;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Schema validator
+ */
+public class SchemaValidator {
+
+ private final Map<RedisCommand, LogicalTypeRoot[]> schemaMap = new
HashMap<>();
+
+ /**
+ * Register schema validator
+ *
+ * @param redisCommand The redis command
+ * @param requiredLogicalTypes The requiredLogicalTypes
+ * @return Schema validator
+ */
+ public SchemaValidator register(RedisCommand redisCommand,
+ LogicalTypeRoot[] requiredLogicalTypes) {
+ schemaMap.putIfAbsent(redisCommand, requiredLogicalTypes);
+ return this;
+ }
+
+ /**
+ * Validate the schema
+ *
+ * @param redisCommand The redis command
+ * @param tableSchema The table schema
+ */
+ public void validate(RedisCommand redisCommand, ResolvedSchema
tableSchema) {
+ LogicalType[] logicalTypes =
tableSchema.getColumns().stream().filter(Column::isPhysical)
+ .map(s ->
s.getDataType().getLogicalType()).toArray(LogicalType[]::new);
+ LogicalTypeRoot[] requiredLogicalTypes = schemaMap.get(redisCommand);
+ for (int i = 0; i < requiredLogicalTypes.length; i++) {
+ Preconditions.checkState(requiredLogicalTypes[i] ==
logicalTypes[i].getTypeRoot(),
+ "Table schema " + Arrays.deepToString(logicalTypes) + " is
invalid. Table schema "
+ + Arrays.deepToString(requiredLogicalTypes) + " is
required for command " + redisCommand
+ .name());
+ }
+ }
+}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
b/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
new file mode 100644
index 000000000..ad49bb0b2
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.redis.common.mapper.row.GetMapper
+org.apache.inlong.sort.redis.common.mapper.row.HgetMapper
+org.apache.inlong.sort.redis.common.mapper.row.ZrevrankMapper
+org.apache.inlong.sort.redis.common.mapper.row.ZscoreMapper
+
+org.apache.inlong.sort.redis.common.config.handler.FlinkJedisClusterConfigHandler
+org.apache.inlong.sort.redis.common.config.handler.FlinkJedisSentinelConfigHandler
+org.apache.inlong.sort.redis.common.config.handler.FlinkJedisStandaloneConfigHandler
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..def6538cf
--- /dev/null
+++
b/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.redis.table.RedisDynamicTableFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 77f3d5e07..1363253d1 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -517,6 +517,17 @@
Source : flink-cdc-connectors 2.2.1 (Please note that the software have
been modified.)
License :
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+ 1.3.9
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
+
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
+
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/RedisCommandsContainerBuilder.java
+
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/RedisMapperHandler.java
+
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommand.java
+
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommandDescription.java
+
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisMapper.java
+
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/RowRedisMapper.java
+ Source : org.apache.bahir:flink-connector-redis_2.11:1.1-SNAPSHOT (Please
note that the software have been modified.)
+ License : https://github.com/apache/bahir-flink/blob/master/LICENSE
+
=======================================================================
Apache InLong Subcomponents:
diff --git a/licenses/inlong-sort-connectors/NOTICE
b/licenses/inlong-sort-connectors/NOTICE
index aff226883..75d6375ee 100644
--- a/licenses/inlong-sort-connectors/NOTICE
+++ b/licenses/inlong-sort-connectors/NOTICE
@@ -2382,6 +2382,14 @@ https://github.com/airlift/airlift/blob/master/LICENSE
+========================================================================
+
+flink-connector-redis
+Copyright 2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
========================================================================
Jetty :: Apache JSP Implementation NOTICE
diff --git a/pom.xml b/pom.xml
index 5dfc3ac26..05800d85a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -212,6 +212,7 @@
<flink.protobuf.version>2.7.6</flink.protobuf.version>
<flink.connector.mongodb.cdc.version>2.2.1</flink.connector.mongodb.cdc.version>
<flink.connector.oracle.cdc.version>2.2.1</flink.connector.oracle.cdc.version>
+ <flink.connector.redis>1.1.0</flink.connector.redis>
<qcloud.flink.cos.fs.hadoop.version>1.10.0-0.1.10</qcloud.flink.cos.fs.hadoop.version>
<qcloud.chdfs.version>2.5</qcloud.chdfs.version>
@@ -315,6 +316,12 @@
<version>${flink.connector.mongodb.cdc.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+
<artifactId>flink-connector-redis_${flink.scala.binary.version}</artifactId>
+ <version>${flink.connector.redis}</version>
+ </dependency>
+
<!-- hive -->
<dependency>
<groupId>org.apache.hive</groupId>