This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ad7e369b8 [INLONG-5447][Sort] Add lookup support for Redis (#5543)
ad7e369b8 is described below

commit ad7e369b8d4eba628717d23f1d39334042e8f072
Author: Charles <[email protected]>
AuthorDate: Tue Aug 16 15:07:15 2022 +0800

    [INLONG-5447][Sort] Add lookup support for Redis (#5543)
---
 inlong-sort/sort-connectors/pom.xml                |   1 +
 inlong-sort/sort-connectors/redis/pom.xml          |  76 ++++++++
 .../redis/common/config/RedisLookupOptions.java    | 129 +++++++++++++
 .../sort/redis/common/config/RedisOptions.java     | 208 +++++++++++++++++++++
 .../handler/FlinkJedisClusterConfigHandler.java    |  68 +++++++
 .../handler/FlinkJedisSentinelConfigHandler.java   |  69 +++++++
 .../handler/FlinkJedisStandaloneConfigHandler.java |  66 +++++++
 .../container/InlongRedisClusterContainer.java     | 115 ++++++++++++
 .../container/InlongRedisCommandsContainer.java    |  77 ++++++++
 .../common/container/InlongRedisContainer.java     | 184 ++++++++++++++++++
 .../container/RedisCommandsContainerBuilder.java   | 129 +++++++++++++
 .../common/descriptor/InlongRedisValidator.java    |  35 ++++
 .../common/handler/InlongJedisConfigHandler.java   |  37 ++++
 .../redis/common/handler/RedisMapperHandler.java   |  67 +++++++
 .../sort/redis/common/mapper/RedisCommand.java     | 142 ++++++++++++++
 .../common/mapper/RedisCommandDescription.java     | 158 ++++++++++++++++
 .../sort/redis/common/mapper/RedisMapper.java      |  79 ++++++++
 .../sort/redis/common/mapper/row/GetMapper.java    |  32 ++++
 .../sort/redis/common/mapper/row/HgetMapper.java   |  35 ++++
 .../redis/common/mapper/row/RowRedisMapper.java    | 122 ++++++++++++
 .../redis/common/mapper/row/ZrevrankMapper.java    |  35 ++++
 .../sort/redis/common/mapper/row/ZscoreMapper.java |  35 ++++
 .../sort/redis/source/RedisDynamicTableSource.java |  93 +++++++++
 .../redis/source/RedisRowDataLookupFunction.java   | 152 +++++++++++++++
 .../sort/redis/table/RedisDynamicTableFactory.java | 148 +++++++++++++++
 .../inlong/sort/redis/table/SchemaValidator.java   |  68 +++++++
 ...ng.connectors.redis.common.hanlder.RedisHandler |  23 +++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 licenses/inlong-sort-connectors/LICENSE            |  11 ++
 licenses/inlong-sort-connectors/NOTICE             |   8 +
 pom.xml                                            |   7 +
 31 files changed, 2425 insertions(+)

diff --git a/inlong-sort/sort-connectors/pom.xml 
b/inlong-sort/sort-connectors/pom.xml
index fe6503227..14eb9f268 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -52,6 +52,7 @@
         <module>elasticsearch-base</module>
         <module>elasticsearch-6</module>
         <module>elasticsearch-7</module>
+        <module>redis</module>
         <module>tubemq</module>
     </modules>
 
diff --git a/inlong-sort/sort-connectors/redis/pom.xml 
b/inlong-sort/sort-connectors/redis/pom.xml
new file mode 100644
index 000000000..ba01ed508
--- /dev/null
+++ b/inlong-sort/sort-connectors/redis/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xmlns="http://maven.apache.org/POM/4.0.0";
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>sort-connectors</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>sort-connector-redis</artifactId>
+    <name>Apache InLong - Sort-connector-redis</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.bahir</groupId>
+            
<artifactId>flink-connector-redis_${flink.scala.binary.version}</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        <include>
+                                            
META-INF/services/org.apache.flink.table.factories.Factory
+                                        </include>
+                                        <include>
+                                            
META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
+                                        </include>
+                                    </includes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java
new file mode 100644
index 000000000..5e95c3028
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisLookupOptions.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Options for the Redis lookup.
+ */
+@Internal
+public class RedisLookupOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    /**
+     * Asynchronous processing has not been implemented yet, but the entry is 
reserved
+     */
+    private final boolean lookupAsync;
+
+    public RedisLookupOptions(
+            long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean 
lookupAsync) {
+        this.cacheMaxSize = cacheMaxSize;
+        this.cacheExpireMs = cacheExpireMs;
+        this.maxRetryTimes = maxRetryTimes;
+        this.lookupAsync = lookupAsync;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public long getCacheMaxSize() {
+        return cacheMaxSize;
+    }
+
+    public long getCacheExpireMs() {
+        return cacheExpireMs;
+    }
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    public boolean getLookupAsync() {
+        return lookupAsync;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof RedisLookupOptions) {
+            RedisLookupOptions options = (RedisLookupOptions) o;
+            return Objects.equals(cacheMaxSize, options.cacheMaxSize)
+                    && Objects.equals(cacheExpireMs, options.cacheExpireMs)
+                    && Objects.equals(maxRetryTimes, options.maxRetryTimes)
+                    && Objects.equals(lookupAsync, options.lookupAsync);
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Builder of {@link RedisLookupOptions}.
+     */
+    public static class Builder {
+
+        private long cacheMaxSize = -1L;
+        private long cacheExpireMs = 0L;
+        private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
+        private boolean lookupAsync = false;
+
+        /**
+         * optional, lookup cache max size, over this value, the old data will 
be eliminated.
+         */
+        public Builder setCacheMaxSize(long cacheMaxSize) {
+            this.cacheMaxSize = cacheMaxSize;
+            return this;
+        }
+
+        /**
+         * optional, lookup cache expire mills, over this time, the old data 
will expire.
+         */
+        public Builder setCacheExpireMs(long cacheExpireMs) {
+            this.cacheExpireMs = cacheExpireMs;
+            return this;
+        }
+
+        /**
+         * optional, max retry times for Hbase connector.
+         */
+        public Builder setMaxRetryTimes(int maxRetryTimes) {
+            this.maxRetryTimes = maxRetryTimes;
+            return this;
+        }
+
+        /**
+         * optional, whether to set async lookup.
+         */
+        public Builder setLookupAsync(boolean lookupAsync) {
+            this.lookupAsync = lookupAsync;
+            return this;
+        }
+
+        public RedisLookupOptions build() {
+            return new RedisLookupOptions(cacheMaxSize, cacheExpireMs, 
maxRetryTimes, lookupAsync);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisOptions.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisOptions.java
new file mode 100644
index 000000000..854aa0634
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/RedisOptions.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator;
+
+import java.time.Duration;
+
+/**
+ * Redis options
+ */
+public class RedisOptions {
+
+    /**
+     * Redis mode, contains [cluster|sentinel|standalone]
+     */
+    public static final ConfigOption<String> REDIS_MODE = ConfigOptions
+            .key("redis-mode")
+            .stringType()
+            .defaultValue(InlongRedisValidator.REDIS_STANDALONE)
+            .withDescription("Optional redis-mode for connect to redis");
+    /**
+     * Redis database, used in [sentinel|standalone] redis-mode
+     */
+    public static final ConfigOption<Integer> DATABASE = ConfigOptions
+            .key("database")
+            .intType()
+            .defaultValue(0)
+            .withDescription("Optional database for connect to redis");
+    /**
+     * Redis command
+     */
+    public static final ConfigOption<String> COMMAND = ConfigOptions
+            .key("command")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Optional command for connect to redis");
+    /**
+     * Password used to connect redis
+     */
+    public static final ConfigOption<String> PASSWORD = ConfigOptions
+            .key("password")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Optional password for connect to redis");
+    /**
+     * Cluster nodes used to connect redis with cluster mode
+     */
+    public static final ConfigOption<String> CLUSTER_NODES = ConfigOptions
+            .key(InlongRedisValidator.REDIS_NODES)
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Optional nodes for connect to redis cluster");
+    /**
+     * Whether to ignore delete events
+     */
+    public static final ConfigOption<Boolean> IGNORE_DELETE = ConfigOptions
+            .key("ignore.delete")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("Ignore delete where receive Retraction");
+    /**
+     * Additional key used in Hash or Sorted-Set
+     */
+    public static final ConfigOption<String> ADDITIONAL_KEY = ConfigOptions
+            .key("additional.key")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Optional additional key for connect to redis");
+    /**
+     * Key ttl
+     */
+    public static final ConfigOption<Integer> KEY_TTL = ConfigOptions
+            .key("key.ttl")
+            .intType()
+            .noDefaultValue()
+            .withDescription("Optional key ttl for connect to redis");
+    /**
+     * Timeout for connect to redis
+     */
+    public static final ConfigOption<Integer> TIMEOUT = ConfigOptions
+            .key("timeout")
+            .intType()
+            .defaultValue(2000)
+            .withDescription("Optional timeout for connect to redis");
+    /**
+     * Socket timeout for connect to redis
+     */
+    public static final ConfigOption<Integer> SOCKET_TIMEOUT = ConfigOptions
+            .key("soTimeout")
+            .intType()
+            .defaultValue(2000)
+            .withDescription("Optional soTimeout for redis");
+    /**
+     * Max total for connect to redis
+     */
+    public static final ConfigOption<Integer> MAX_TOTAL = ConfigOptions
+            .key("maxTotal")
+            .intType()
+            .defaultValue(2)
+            .withDescription("Optional maxTotal for connect to redis");
+    /**
+     * Max idle for connect to redis
+     */
+    public static final ConfigOption<Integer> MAXIDLE = ConfigOptions
+            .key("maxIdle")
+            .intType()
+            .defaultValue(2)
+            .withDescription("Optional maxIdle for connect to redis");
+    /**
+     * Min idle for connect to redis
+     */
+    public static final ConfigOption<Integer> MINIDLE = ConfigOptions
+            .key("minIdle")
+            .intType()
+            .defaultValue(1)
+            .withDescription("Optional minIdle for connect to redis");
+    /**
+     * Port for connect to redis used in standalone mode
+     */
+    public static final ConfigOption<Integer> PORT = ConfigOptions
+            .key("port")
+            .intType()
+            .defaultValue(6379)
+            .withDescription("Optional port for connect to redis");
+    /**
+     * Port for connect to redis used in standalone mode
+     */
+    public static final ConfigOption<String> HOST = ConfigOptions
+            .key("host")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Optional host for connect to redis");
+    /**
+     * Redis master name for connect to redis used in sentinel mode
+     */
+    public static final ConfigOption<String> REDIS_MASTER_NAME = ConfigOptions
+            .key("master.name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Optional master.name for connect to redis 
sentinels");
+    /**
+     * Sentinels info for connect to redis used in sentinel mode
+     */
+    public static final ConfigOption<String> SENTINELS_INFO = ConfigOptions
+            .key("sentinels.info")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Optional sentinels.info for connect to redis 
sentinels");
+
+    /**
+     * Lookup cache max rows
+     */
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+            ConfigOptions.key("lookup.cache.max-rows")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription(
+                            "The max number of rows of lookup cache, over this 
value, the oldest rows will "
+                                    + "be eliminated. \"cache.max-rows\" and 
\"cache.ttl\" "
+                                    + "options must all be specified if any of 
them is "
+                                    + "specified.");
+    /**
+     * Lookup cache ttl
+     */
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+            ConfigOptions.key("lookup.cache.ttl")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The cache time to live.");
+    /**
+     * Lookup max retries
+     */
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+            ConfigOptions.key("lookup.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("The max retry times if lookup database 
failed.");
+    /**
+     * Lookup async
+     */
+    public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+            ConfigOptions.key("lookup.async")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("whether to set async lookup.");
+
+    private RedisOptions() {
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
new file mode 100644
index 000000000..09ad12763
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
+
+/**
+ * jedis cluster config handler to find and create jedis cluster config use 
meta and
+ * copy from {@link 
org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisClusterConfigHandler}
+ */
+public class FlinkJedisClusterConfigHandler implements 
InlongJedisConfigHandler {
+
+    public FlinkJedisClusterConfigHandler() {
+    }
+
+    @Override
+    public FlinkJedisConfigBase createFlinkJedisConfig(ReadableConfig config) {
+        String nodesInfo = config.get(RedisOptions.CLUSTER_NODES);
+        Preconditions.checkNotNull(nodesInfo, "nodes should not be null");
+        Set<InetSocketAddress> nodes = 
Arrays.stream(nodesInfo.split(",")).map(r -> {
+            String[] arr = r.split(":");
+            return new InetSocketAddress(arr[0].trim(), 
Integer.parseInt(arr[1].trim()));
+        }).collect(Collectors.toSet());
+        FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder()
+                
.setNodes(nodes).setPassword(config.get(RedisOptions.PASSWORD));
+        builder.setMaxIdle(config.get(RedisOptions.MAXIDLE))
+                .setMinIdle(config.get(RedisOptions.MINIDLE))
+                .setMaxTotal(config.get(RedisOptions.MAX_TOTAL))
+                .setTimeout(config.get(RedisOptions.TIMEOUT));
+        return builder.build();
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> require = new HashMap<>();
+        require.put(REDIS_MODE, REDIS_CLUSTER);
+        return require;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
new file mode 100644
index 000000000..c40d394ec
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_SENTINEL;
+
+/**
+ * Jedis sentinel config handler to find and create jedis Sentinel config use 
meta and
+ * copy from {@link 
org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisSentinelConfigHandler}
+ */
+public class FlinkJedisSentinelConfigHandler implements 
InlongJedisConfigHandler {
+
+    public FlinkJedisSentinelConfigHandler() {
+
+    }
+
+    @Override
+    public FlinkJedisConfigBase createFlinkJedisConfig(ReadableConfig config) {
+        String masterName = config.get(RedisOptions.REDIS_MASTER_NAME);
+        String sentinelsInfo = config.get(RedisOptions.SENTINELS_INFO);
+        Objects.requireNonNull(masterName, "master should not be null in 
sentinel mode");
+        Objects.requireNonNull(sentinelsInfo, "sentinels should not be null in 
sentinel mode");
+        Set<String> sentinels = new 
HashSet<>(Arrays.asList(sentinelsInfo.split(",")));
+        String sentinelsPassword = config.get(RedisOptions.PASSWORD);
+        return new FlinkJedisSentinelConfig.Builder()
+                
.setMasterName(masterName).setSentinels(sentinels).setPassword(sentinelsPassword)
+                .setMaxIdle(config.get(RedisOptions.MAXIDLE))
+                .setMinIdle(config.get(RedisOptions.MINIDLE))
+                .setMaxTotal(config.get(RedisOptions.MAX_TOTAL))
+                .setDatabase(config.get(RedisOptions.DATABASE))
+                .setConnectionTimeout(config.get(RedisOptions.TIMEOUT))
+                .setSoTimeout(config.get(RedisOptions.SOCKET_TIMEOUT)).build();
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> require = new HashMap<>();
+        require.put(REDIS_MODE, REDIS_SENTINEL);
+        return require;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisStandaloneConfigHandler.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisStandaloneConfigHandler.java
new file mode 100644
index 000000000..4e39fdf58
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisStandaloneConfigHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.config.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+
+import java.util.HashMap;
+import java.util.Map;
+import static 
org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator.REDIS_MODE;
+import static 
org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator.REDIS_STANDALONE;
+
+/**
+ * Jedis standalone config handler to find and create jedis standalone config 
use meta.
+ */
+public class FlinkJedisStandaloneConfigHandler implements 
InlongJedisConfigHandler {
+
+    public FlinkJedisStandaloneConfigHandler() {
+
+    }
+
+    @Override
+    public FlinkJedisConfigBase createFlinkJedisConfig(ReadableConfig config) {
+        String host = config.get(RedisOptions.HOST);
+        Preconditions.checkNotNull(host, "host should not be null in 
standalone mode");
+        int port = config.get(RedisOptions.PORT);
+        Preconditions.checkNotNull(port, "port should not be null in 
standalone mode");
+        String password = config.get(RedisOptions.PASSWORD);
+        FlinkJedisPoolConfig.Builder builder = new 
FlinkJedisPoolConfig.Builder()
+                .setHost(host)
+                .setPassword(password)
+                .setPort(port)
+                .setMaxIdle(config.get(RedisOptions.MAXIDLE))
+                .setMinIdle(config.get(RedisOptions.MINIDLE))
+                .setMaxTotal(config.get(RedisOptions.MAX_TOTAL))
+                .setDatabase(config.get(RedisOptions.DATABASE))
+                .setTimeout(config.get(RedisOptions.TIMEOUT));
+        return builder.build();
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> require = new HashMap<>();
+        require.put(REDIS_MODE, REDIS_STANDALONE);
+        return require;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisClusterContainer.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisClusterContainer.java
new file mode 100644
index 000000000..b2b8d957f
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisClusterContainer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.container;
+
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+/**
+ * The redis cluster contain expand from {@link RedisClusterContainer}
+ */
+public class InlongRedisClusterContainer extends RedisClusterContainer 
implements InlongRedisCommandsContainer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InlongRedisClusterContainer.class);
+
+    private final transient JedisCluster jedisCluster;
+
+    public InlongRedisClusterContainer(JedisCluster jedisCluster) {
+        super(jedisCluster);
+        this.jedisCluster = jedisCluster;
+    }
+
+    @Override
+    public void del(String key) {
+        try {
+            jedisCluster.del(key);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command del to key 
{}  error message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void hdel(String key, String hashField) {
+        try {
+            jedisCluster.hdel(key, hashField);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command hdel to key 
{} of field {}   error message {}",
+                        key, hashField, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public String get(String key) {
+        try {
+            return jedisCluster.get(key);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot get value with get command from key {} error 
message {}", key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public String hget(String key, String hashField) {
+        try {
+            return jedisCluster.hget(key, hashField);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot get value with hget command from key {} of 
field {}  error message {}",
+                        key, hashField, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public Double zscore(String key, String member) {
+        try {
+            return jedisCluster.zscore(key, member);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot get value with zscore command from key {} of 
member {}  error message {}",
+                        key, member, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public Long zrevrank(String key, String member) {
+        try {
+            return jedisCluster.zrevrank(key, member);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot get value with zrevrank command from key {} 
of member {}  error message {}",
+                        key, member, e.getMessage());
+            }
+            throw e;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisCommandsContainer.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisCommandsContainer.java
new file mode 100644
index 000000000..a7d675810
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisCommandsContainer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.container;
+
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+
+/**
+ * The container interface expand from {@link RedisCommandsContainer}
+ */
+public interface InlongRedisCommandsContainer extends RedisCommandsContainer {
+
+    /**
+     * Delete value from specified key.
+     *
+     * @param key the key  to be delete
+     */
+    void del(String key);
+
+    /**
+     * Delete field in the hash stored .
+     *
+     * @param key Hash name
+     * @param hashField Hash field
+     */
+    void hdel(String key, String hashField);
+
+    /**
+     * Get value from specified key
+     *
+     * @param key The specified key
+     * @return The value of specified key
+     */
+    String get(String key);
+
+    /**
+     * Get value from specified key with hashField
+     *
+     * @param key The specified key
+     * @param hashField The hash field
+     * @return The value of specified key
+     */
+    String hget(String key, String hashField);
+
+    /**
+     * Get value from specified key with hashField
+     *
+     * @param key The specified key
+     * @param member The member of sorted-set
+     * @return The value of specified key
+     */
+    Double zscore(String key, String member);
+
+    /**
+     * Get value from specified key with member
+     *
+     * @param key The specified key
+     * @param member The member of sorted-set
+     * @return The value of specified key with member
+     */
+    Long zrevrank(String key, String member);
+
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisContainer.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisContainer.java
new file mode 100644
index 000000000..e0902e607
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/InlongRedisContainer.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.container;
+
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The redis contain expand from {@link RedisContainer}
+ */
+public class InlongRedisContainer extends RedisContainer implements 
InlongRedisCommandsContainer {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InlongRedisContainer.class);
+    private final transient JedisPool jedisPool;
+    private final transient JedisSentinelPool jedisSentinelPool;
+
+    /**
+     * Use this constructor if to connect with single Redis server.
+     *
+     * @param jedisPool JedisPool which actually manages Jedis instances
+     */
+    public InlongRedisContainer(JedisPool jedisPool) {
+        super(jedisPool);
+        this.jedisPool = jedisPool;
+        this.jedisSentinelPool = null;
+    }
+
+    /**
+     * Use this constructor if Redis environment is clustered with sentinels.
+     *
+     * @param sentinelPool SentinelPool which actually manages Jedis instances
+     */
+    public InlongRedisContainer(JedisSentinelPool sentinelPool) {
+        super(sentinelPool);
+        this.jedisPool = null;
+        this.jedisSentinelPool = sentinelPool;
+    }
+
+    @Override
+    public void del(String key) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.del(key);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis with del command to key {}  error 
message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void hdel(String key, String hashField) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.hdel(key, hashField);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis with hdel command to key {} of 
field {}  error message {}",
+                        key, hashField, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public String get(String key) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            return jedis.get(key);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot get value with get command from key {} error 
message {}", key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public String hget(String key, String hashField) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            return jedis.hget(key, hashField);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot get value with hget command from key {} of 
field {}  error message {}",
+                        key, hashField, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public Double zscore(String key, String member) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            return jedis.zscore(key, member);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot get value with zscore command from key {} of 
member {}  error message {}",
+                        key, member, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public Long zrevrank(String key, String member) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            return jedis.zrevrank(key, member);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot get value with zrevrank command from key {} 
of member {}  error message {}",
+                        key, member, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    public Jedis getInstance() {
+        if (jedisSentinelPool != null) {
+            return jedisSentinelPool.getResource();
+        } else {
+            return jedisPool.getResource();
+        }
+    }
+
+    /**
+     * Closes the jedis instance after finishing the command.
+     *
+     * @param jedis The jedis instance
+     */
+    public void releaseInstance(final Jedis jedis) {
+        if (jedis == null) {
+            return;
+        }
+        try {
+            jedis.close();
+        } catch (Exception e) {
+            LOG.error("Failed to close (return) instance to pool", e);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/RedisCommandsContainerBuilder.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/RedisCommandsContainerBuilder.java
new file mode 100644
index 000000000..4ee8ecfc7
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/RedisCommandsContainerBuilder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Objects;
+
+/**
+ * The redis command container builder and
+ * copy from {@link 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder}
+ */
+public class RedisCommandsContainerBuilder {
+
+    /**
+     * Initialize the {@link InlongRedisCommandsContainer} based on the 
instance type.
+     *
+     * @param flinkJedisConfigBase configuration base
+     * @return @throws IllegalArgumentException if jedisPoolConfig, 
jedisClusterConfig and jedisSentinelConfig are all
+     *         null
+     */
+    public static InlongRedisCommandsContainer build(FlinkJedisConfigBase 
flinkJedisConfigBase) {
+        if (flinkJedisConfigBase instanceof FlinkJedisPoolConfig) {
+            FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) 
flinkJedisConfigBase;
+            return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
+        } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
+            FlinkJedisClusterConfig flinkJedisClusterConfig = 
(FlinkJedisClusterConfig) flinkJedisConfigBase;
+            return 
RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
+        } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
+            FlinkJedisSentinelConfig flinkJedisSentinelConfig = 
(FlinkJedisSentinelConfig) flinkJedisConfigBase;
+            return 
RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
+        } else {
+            throw new IllegalArgumentException("Jedis configuration not 
found");
+        }
+    }
+
+    /**
+     * Builds container for single Redis environment.
+     *
+     * @param jedisPoolConfig configuration for JedisPool
+     * @return container for single Redis environment
+     * @throws NullPointerException if jedisPoolConfig is null
+     */
+    public static InlongRedisCommandsContainer build(FlinkJedisPoolConfig 
jedisPoolConfig) {
+        Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not 
be Null");
+
+        GenericObjectPoolConfig genericObjectPoolConfig = 
getGenericObjectPoolConfig(jedisPoolConfig);
+
+        JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
+                jedisPoolConfig.getPort(), 
jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+                jedisPoolConfig.getDatabase());
+        return new InlongRedisContainer(jedisPool);
+    }
+
+    /**
+     * Builds container for Redis Cluster environment.
+     *
+     * @param jedisClusterConfig configuration for JedisCluster
+     * @return container for Redis Cluster environment
+     * @throws NullPointerException if jedisClusterConfig is null
+     */
+    public static InlongRedisCommandsContainer build(FlinkJedisClusterConfig 
jedisClusterConfig) {
+        Objects.requireNonNull(jedisClusterConfig, "Redis cluster config 
should not be Null");
+
+        GenericObjectPoolConfig genericObjectPoolConfig = 
getGenericObjectPoolConfig(jedisClusterConfig);
+
+        JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
+                jedisClusterConfig.getConnectionTimeout(),
+                jedisClusterConfig.getConnectionTimeout(),
+                jedisClusterConfig.getMaxRedirections(),
+                jedisClusterConfig.getPassword(),
+                genericObjectPoolConfig);
+        return new InlongRedisClusterContainer(jedisCluster);
+    }
+
+    /**
+     * Builds container for Redis Sentinel environment.
+     *
+     * @param jedisSentinelConfig configuration for JedisSentinel
+     * @return container for Redis sentinel environment
+     * @throws NullPointerException if jedisSentinelConfig is null
+     */
+    public static InlongRedisCommandsContainer build(FlinkJedisSentinelConfig 
jedisSentinelConfig) {
+        Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config 
should not be Null");
+
+        GenericObjectPoolConfig genericObjectPoolConfig = 
getGenericObjectPoolConfig(jedisSentinelConfig);
+
+        JedisSentinelPool jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+                jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+                jedisSentinelConfig.getConnectionTimeout(), 
jedisSentinelConfig.getSoTimeout(),
+                jedisSentinelConfig.getPassword(), 
jedisSentinelConfig.getDatabase());
+        return new InlongRedisContainer(jedisSentinelPool);
+    }
+
+    public static GenericObjectPoolConfig 
getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+        GenericObjectPoolConfig genericObjectPoolConfig =
+                jedisConfig.getTestWhileIdle() ? new JedisPoolConfig() : new 
GenericObjectPoolConfig();
+        genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
+        genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
+        genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
+        genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow());
+        genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn());
+
+        return genericObjectPoolConfig;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/descriptor/InlongRedisValidator.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/descriptor/InlongRedisValidator.java
new file mode 100644
index 000000000..d17b1d01a
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/descriptor/InlongRedisValidator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.descriptor;
+
+import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
+
+/**
+ * InLong redis validator expand from {@link RedisValidator}
+ */
+public class InlongRedisValidator extends RedisValidator {
+
+    /**
+     * The standalone of redis deploy mode
+     */
+    public static final String REDIS_STANDALONE = "standalone";
+    /**
+     * Redis additional key used in hash or sorted-set
+     */
+    public static final String REDIS_ADDITIONAL_KEY = "additional.key";
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java
new file mode 100644
index 000000000..6430540fe
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/InlongJedisConfigHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.handler;
+
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler;
+
+/**
+ * handler to create flink jedis config.
+ */
+public interface InlongJedisConfigHandler extends RedisHandler {
+
+    /**
+     * create flink jedis config use sepecified properties.
+     *
+     * @param config The config
+     * @return flink jedis config
+     */
+    FlinkJedisConfigBase createFlinkJedisConfig(ReadableConfig config);
+
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/RedisMapperHandler.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/RedisMapperHandler.java
new file mode 100644
index 000000000..0f3236e90
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/RedisMapperHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.handler;
+
+import org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler;
+import org.apache.inlong.sort.redis.common.mapper.RedisMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
+import static 
org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator.REDIS_ADDITIONAL_KEY;
+
+/**
+ * Handler for create redis mapper.
+ * Copy from {@link 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler}
+ */
+public interface RedisMapperHandler extends RedisHandler {
+
+    Logger LOGGER = LoggerFactory.getLogger(RedisMapperHandler.class);
+
+    /**
+     * create a correct redis mapper use properties.
+     *
+     * @param properties to create redis mapper.
+     * @return redis mapper.
+     */
+    default RedisMapper createRedisMapper(Map<String, String> properties) {
+        String ttl = properties.get(REDIS_KEY_TTL);
+        String additionalKey = properties.get(REDIS_ADDITIONAL_KEY);
+        try {
+            Class redisMapper = 
Class.forName(this.getClass().getCanonicalName());
+            if (ttl == null && additionalKey == null) {
+                return (RedisMapper) redisMapper.newInstance();
+            }
+            if (additionalKey != null && ttl != null) {
+                return (RedisMapper) redisMapper.getConstructor(Integer.class, 
String.class)
+                        .newInstance(ttl, additionalKey);
+            }
+            if (additionalKey != null) {
+                return (RedisMapper) redisMapper.getConstructor(String.class)
+                        .newInstance(additionalKey);
+            }
+            return (RedisMapper) redisMapper.getConstructor(Integer.class)
+                    .newInstance(ttl);
+        } catch (Exception e) {
+            LOGGER.error("create redis mapper failed", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommand.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommand.java
new file mode 100644
index 000000000..871a3a911
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommand.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ * Copy from {@link 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand}
+ */
+public enum RedisCommand {
+
+    /**
+     * Insert the specified value at the head of the list stored at key.
+     * If key does not exist, it is created as empty list before performing 
the push operations.
+     */
+    LPUSH(RedisDataType.LIST),
+
+    /**
+     * Insert the specified value at the tail of the list stored at key.
+     * If key does not exist, it is created as empty list before performing 
the push operation.
+     */
+    RPUSH(RedisDataType.LIST),
+
+    /**
+     * Add the specified member to the set stored at key.
+     * Specified member that is already a member of this set is ignored.
+     */
+    SADD(RedisDataType.SET),
+
+    /**
+     * Set key to hold the string value. If key already holds a value,
+     * it is overwritten, regardless of its type.
+     */
+    SET(RedisDataType.STRING),
+
+    /**
+     * Set key to hold the string value, with a time to live (TTL). If key 
already holds a value,
+     * it is overwritten, regardless of its type.
+     */
+    SETEX(RedisDataType.STRING),
+
+    /**
+     * Adds the element to the HyperLogLog data structure stored at the 
variable name specified as first argument.
+     */
+    PFADD(RedisDataType.HYPER_LOG_LOG),
+
+    /**
+     * Posts a message to the given channel.
+     */
+    PUBLISH(RedisDataType.PUBSUB),
+
+    /**
+     * Adds the specified members with the specified score to the sorted set 
stored at key.
+     */
+    ZADD(RedisDataType.SORTED_SET),
+
+    ZINCRBY(RedisDataType.SORTED_SET),
+
+    /**
+     * Removes the specified members from the sorted set stored at key.
+     */
+    ZREM(RedisDataType.SORTED_SET),
+
+    /**
+     * Sets field in the hash stored at key to value. If key does not exist,
+     * a new key holding a hash is created. If field already exists in the 
hash, it is overwritten.
+     */
+    HSET(RedisDataType.HASH),
+
+    HINCRBY(RedisDataType.HINCRBY),
+
+    /**
+     * Delta plus for specified key.
+     */
+    INCRBY(RedisDataType.STRING),
+
+    /**
+     * Delta plus for specified key and expire the key with fixed time.
+     */
+    INCRBY_EX(RedisDataType.STRING),
+
+    /**
+     * decrease with fixed num for specified key.
+     */
+    DECRBY(RedisDataType.STRING),
+
+    /**
+     * decrease with fixed num for specified key and expire the key with fixed 
time.
+     */
+    DESCRBY_EX(RedisDataType.STRING),
+
+    /**
+     * Get value for specified key
+     */
+    GET(RedisDataType.STRING),
+    /**
+     * Get value for specified key with hash field
+     */
+    HGET(RedisDataType.HASH),
+    /**
+     * Get rank number for specified key with member
+     */
+    ZREVRANK(RedisDataType.SORTED_SET),
+    /**
+     * Get score for specified key with member
+     */
+    ZSCORE(RedisDataType.SORTED_SET);
+
+    /**
+     * The {@link RedisDataType} this command belongs to.
+     */
+    private RedisDataType redisDataType;
+
+    RedisCommand(RedisDataType redisDataType) {
+        this.redisDataType = redisDataType;
+    }
+
+    /**
+     * The {@link RedisDataType} this command belongs to.
+     *
+     * @return the {@link RedisDataType}
+     */
+    public RedisDataType getRedisDataType() {
+        return redisDataType;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommandDescription.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommandDescription.java
new file mode 100644
index 000000000..9e86e6a26
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommandDescription.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The description of the command type.
+ * Copy from {@link 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription}
+ */
+public class RedisCommandDescription implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private RedisCommand redisCommand;
+
+    /**
+     * This additional key is needed for the group {@link RedisDataType#HASH} 
and {@link RedisDataType#SORTED_SET}.
+     * Other {@link RedisDataType} works only with two variable i.e. name of 
the list and value to be added.
+     * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} 
we need three variables.
+     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element. Its possible to use TTL.
+     * {@link #getAdditionalKey()} used as hash name for {@link 
RedisDataType#HASH}
+     * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element 
and it's score.
+     * {@link #getAdditionalKey()} used as set name for {@link 
RedisDataType#SORTED_SET}
+     */
+    private String additionalKey;
+
+    /**
+     * This additional key is optional for the group {@link 
RedisDataType#HASH}, required for {@link
+     * RedisCommand#SETEX}.
+     * For the other types and commands, its not used.
+     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element. Its possible to use TTL.
+     * {@link #getAdditionalTTL()} used as time to live (TTL) for {@link 
RedisDataType#HASH}
+     * <p>For {@link RedisCommand#SETEX}, we need key, value and time to live 
(TTL).
+     */
+    private Integer additionalTTL;
+
+    /**
+     * Default constructor for {@link RedisCommandDescription}.
+     * For {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} 
data types, {@code additionalKey} is
+     * required.
+     * For {@link RedisCommand#SETEX} command, {@code additionalTTL} is 
required.
+     * In both cases, if the respective variables are not provided, it throws 
an {@link IllegalArgumentException}
+     *
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalKey additional key for Hash data type
+     * @param additionalTTL additional TTL optional for Hash data type
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, String 
additionalKey, Integer additionalTTL) {
+        Objects.requireNonNull(redisCommand, "Redis command type can not be 
null");
+        this.redisCommand = redisCommand;
+        this.additionalKey = additionalKey;
+        this.additionalTTL = additionalTTL;
+
+        if (redisCommand.getRedisDataType() == RedisDataType.HASH
+                || redisCommand.getRedisDataType() == 
RedisDataType.SORTED_SET) {
+            if (additionalKey == null) {
+                throw new IllegalArgumentException("Hash and Sorted-Set should 
have additional key");
+            }
+        }
+
+        if (redisCommand.equals(RedisCommand.SETEX)) {
+            if (additionalTTL == null) {
+                throw new IllegalArgumentException("SETEX command should have 
time to live (TTL)");
+            }
+        }
+
+        if (redisCommand.equals(RedisCommand.INCRBY_EX)) {
+            if (additionalTTL == null) {
+                throw new IllegalArgumentException("INCRBY_EX command should 
have time to live (TTL)");
+            }
+        }
+
+        if (redisCommand.equals(RedisCommand.DESCRBY_EX)) {
+            if (additionalTTL == null) {
+                throw new IllegalArgumentException("INCRBY_EX command should 
have time to live (TTL)");
+            }
+        }
+    }
+
+    /**
+     * Use this constructor when data type is {@link RedisDataType#HASH} 
(without TTL) or {@link
+     * RedisDataType#SORTED_SET}.
+     * If different data type is specified, {@code additionalKey} is ignored.
+     *
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalKey additional key for Hash and Sorted set data type
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, String 
additionalKey) {
+        this(redisCommand, additionalKey, null);
+    }
+
+    /**
+     * Use this constructor when using SETEX command {@link 
RedisDataType#STRING}.
+     * This command requires a TTL. Throws {@link IllegalArgumentException} if 
it is null.
+     *
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalTTL additional TTL required for SETEX command
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, Integer 
additionalTTL) {
+        this(redisCommand, null, additionalTTL);
+    }
+
+    /**
+     * Use this constructor when command type is not in group {@link 
RedisDataType#HASH} or {@link
+     * RedisDataType#SORTED_SET}.
+     *
+     * @param redisCommand the redis data type {@link RedisCommand}
+     */
+    public RedisCommandDescription(RedisCommand redisCommand) {
+        this(redisCommand, null, null);
+    }
+
+    /**
+     * Returns the {@link RedisCommand}.
+     *
+     * @return the command type of the mapping
+     */
+    public RedisCommand getCommand() {
+        return redisCommand;
+    }
+
+    /**
+     * Returns the additional key if data type is {@link RedisDataType#HASH} 
and {@link RedisDataType#SORTED_SET}.
+     *
+     * @return the additional key
+     */
+    public String getAdditionalKey() {
+        return additionalKey;
+    }
+
+    /**
+     * Returns the additional time to live (TTL) if data type is {@link 
RedisDataType#HASH}.
+     *
+     * @return the additional TTL
+     */
+    public Integer getAdditionalTTL() {
+        return additionalTTL;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisMapper.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisMapper.java
new file mode 100644
index 000000000..6436b7cbf
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Function that creates the description how the input data should be mapped 
to redis type.
+ * Copy from {@link 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper}
+ */
+public interface RedisMapper<T> extends Function, Serializable {
+
+    /**
+     * Returns descriptor which defines data type.
+     *
+     * @return data type descriptor
+     */
+    RedisCommandDescription getCommandDescription();
+
+    /**
+     * Extracts key from data.
+     *
+     * @param data source data
+     * @return key
+     */
+    default String getKeyFromData(T data) {
+        return null;
+    }
+
+    /**
+     * Extracts value from data.
+     *
+     * @param data source data
+     * @return value
+     */
+    default String getValueFromData(T data) {
+        return null;
+    }
+
+    /**
+     * Extracts the additional key from data as an {@link Optional <String/>}.
+     * The default implementation returns an empty Optional.
+     *
+     * @param data
+     * @return Optional
+     */
+    default Optional<String> getAdditionalKey(T data) {
+        return Optional.empty();
+    }
+
+    /**
+     * Extracts the additional time to live (TTL) for data.
+     * The default implementation returns an empty Optional.
+     *
+     * @param data
+     * @return Optional
+     */
+    default Optional<Integer> getAdditionalTTL(T data) {
+        return Optional.empty();
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/GetMapper.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/GetMapper.java
new file mode 100644
index 000000000..f8fc1cffa
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/GetMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+/**
+ * Get mapper that used to get value from a specified key
+ */
+public class GetMapper extends RowRedisMapper {
+
+    private static final long serialVersionUID = 1L;
+
+    public GetMapper() {
+        super(RedisCommand.GET);
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/HgetMapper.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/HgetMapper.java
new file mode 100644
index 000000000..0e61f0543
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/HgetMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+/**
+ * Hget mapper that used to get value from a specified key with hash field
+ */
+public class HgetMapper extends RowRedisMapper {
+
+    public HgetMapper() {
+        super(RedisCommand.HGET);
+    }
+
+    public HgetMapper(String additionalKey) {
+        super(additionalKey, RedisCommand.HGET);
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/RowRedisMapper.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/RowRedisMapper.java
new file mode 100644
index 000000000..a2838cce6
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/RowRedisMapper.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.redis.common.handler.RedisMapperHandler;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommandDescription;
+import org.apache.inlong.sort.redis.common.mapper.RedisMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;
+
+/**
+ * Base row redis mapper implement.
+ * Copy from {@link 
org.apache.flink.streaming.connectors.redis.common.mapper.row.RowRedisMapper}
+ */
+public abstract class RowRedisMapper implements RedisMapper<RowData>, 
RedisMapperHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RowRedisMapper.class);
+
+    private Integer ttl;
+
+    private RedisCommand redisCommand;
+
+    private String additionalKey;
+
+    public RowRedisMapper() {
+    }
+
+    public RowRedisMapper(Integer ttl, RedisCommand redisCommand) {
+        this(ttl, null, redisCommand);
+    }
+
+    public RowRedisMapper(Integer ttl, String additionalKey, RedisCommand 
redisCommand) {
+        this.ttl = ttl;
+        this.additionalKey = additionalKey;
+        this.redisCommand = redisCommand;
+    }
+
+    public RowRedisMapper(String additionalKey, RedisCommand redisCommand) {
+        this(null, additionalKey, redisCommand);
+    }
+
+    public RowRedisMapper(RedisCommand redisCommand) {
+        this.redisCommand = redisCommand;
+    }
+
+    public Integer getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(int ttl) {
+        this.ttl = ttl;
+    }
+
+    public RedisCommand getRedisCommand() {
+        return redisCommand;
+    }
+
+    public void setRedisCommand(RedisCommand redisCommand) {
+        this.redisCommand = redisCommand;
+    }
+
+    @Override
+    public RedisCommandDescription getCommandDescription() {
+        return new RedisCommandDescription(redisCommand, additionalKey, ttl);
+    }
+
+    @Override
+    public String getKeyFromData(RowData data) {
+        return data.getString(0).toString();
+    }
+
+    @Override
+    public String getValueFromData(RowData data) {
+        return data.getString(1).toString();
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> require = new HashMap<>();
+        require.put(REDIS_COMMAND, 
getRedisCommand().name().toLowerCase(Locale.ROOT));
+        return require;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        RedisCommand redisCommand = ((RowRedisMapper) obj).redisCommand;
+        return this.redisCommand == redisCommand;
+    }
+
+    @Override
+    public Optional<Integer> getAdditionalTTL(RowData data) {
+        return Optional.ofNullable(getTtl());
+    }
+
+    @Override
+    public Optional<String> getAdditionalKey(RowData data) {
+        return Optional.ofNullable(additionalKey);
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZrevrankMapper.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZrevrankMapper.java
new file mode 100644
index 000000000..62322635e
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZrevrankMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+/**
+ * Zrevrank mapper that used to get rank number from a specified key with 
member
+ */
+public class ZrevrankMapper extends RowRedisMapper {
+
+    public ZrevrankMapper(String additionalKey) {
+        super(additionalKey, RedisCommand.ZREVRANK);
+    }
+
+    public ZrevrankMapper() {
+        super(RedisCommand.ZREVRANK);
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZscoreMapper.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZscoreMapper.java
new file mode 100644
index 000000000..0dd7a220b
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/ZscoreMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.common.mapper.row;
+
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+/**
+ * Zscore mapper that used to get score from a specified key with member
+ */
+public class ZscoreMapper extends RowRedisMapper {
+
+    public ZscoreMapper(String additionalKey) {
+        super(additionalKey, RedisCommand.ZSCORE);
+    }
+
+    public ZscoreMapper() {
+        super(RedisCommand.ZSCORE);
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
new file mode 100644
index 000000000..30b6766e4
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisDynamicTableSource.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.source;
+
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.handler.InlongJedisConfigHandler;
+import org.apache.inlong.sort.redis.common.handler.RedisMapperHandler;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+import org.apache.inlong.sort.redis.common.mapper.RedisMapper;
+import org.apache.inlong.sort.redis.table.SchemaValidator;
+
+import java.util.Map;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+
+/**
+ * Redis dynamic table source
+ */
+public class RedisDynamicTableSource implements LookupTableSource {
+
+    private final FlinkJedisConfigBase flinkJedisConfigBase;
+
+    private final RedisMapper redisMapper;
+    private final ResolvedSchema tableSchema;
+    private final ReadableConfig config;
+    private final RedisLookupOptions redisLookupOptions;
+    private final Map<String, String> properties;
+
+    public RedisDynamicTableSource(Map<String, String> properties, 
ResolvedSchema tableSchema,
+            ReadableConfig config, RedisLookupOptions redisLookupOptions) {
+        this.properties = properties;
+        Preconditions.checkNotNull(properties, "properties should not be 
null");
+        this.tableSchema = tableSchema;
+        Preconditions.checkNotNull(tableSchema, "tableSchema should not be 
null");
+        this.config = config;
+        properties.putIfAbsent(RedisOptions.REDIS_MODE.key(), 
config.get(RedisOptions.REDIS_MODE));
+        redisMapper = RedisHandlerServices
+                .findRedisHandler(RedisMapperHandler.class, properties)
+                .createRedisMapper(properties);
+        RedisCommand command = 
redisMapper.getCommandDescription().getCommand();
+        new SchemaValidator()
+                .register(RedisCommand.GET, new LogicalTypeRoot[]{VARCHAR, 
VARCHAR})
+                .register(RedisCommand.HGET, new LogicalTypeRoot[]{VARCHAR, 
VARCHAR})
+                .register(RedisCommand.ZSCORE, new LogicalTypeRoot[]{VARCHAR, 
DOUBLE})
+                .register(RedisCommand.ZREVRANK, new 
LogicalTypeRoot[]{VARCHAR, BIGINT})
+                .validate(command, tableSchema);
+        flinkJedisConfigBase = RedisHandlerServices
+                .findRedisHandler(InlongJedisConfigHandler.class, 
properties).createFlinkJedisConfig(config);
+        this.redisLookupOptions = redisLookupOptions;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new RedisDynamicTableSource(properties, tableSchema, config, 
redisLookupOptions);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "REDIS";
+    }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
+        return TableFunctionProvider.of(new RedisRowDataLookupFunction(
+                redisMapper.getCommandDescription(), flinkJedisConfigBase, 
this.redisLookupOptions));
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
new file mode 100644
index 000000000..c71a38792
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/source/RedisRowDataLookupFunction.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.source;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
+import 
org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer;
+import 
org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommandDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Redis RowData lookup function
+ */
+public class RedisRowDataLookupFunction extends TableFunction<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RedisRowDataLookupFunction.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private final FlinkJedisConfigBase flinkJedisConfigBase;
+    private final String additionalKey;
+    private final RedisCommand redisCommand;
+    private transient Cache<RowData, RowData> cache;
+    private InlongRedisCommandsContainer redisCommandsContainer;
+
+    RedisRowDataLookupFunction(RedisCommandDescription redisCommandDescription,
+            FlinkJedisConfigBase flinkJedisConfigBase, RedisLookupOptions 
redisLookupOptions) {
+        this.flinkJedisConfigBase = flinkJedisConfigBase;
+        this.redisCommand = redisCommandDescription.getCommand();
+        this.additionalKey = redisCommandDescription.getAdditionalKey();
+        this.cacheMaxSize = redisLookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = redisLookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = redisLookupOptions.getMaxRetryTimes();
+    }
+
+    /**
+     * This is a lookup method which is called by Flink framework in runtime, 
only support one key
+     *
+     * @param keys lookup keys
+     */
+    public void eval(Object... keys) {
+        RowData keyRow = GenericRowData.of(keys);
+        if (cache != null) {
+            RowData cachedRow = cache.getIfPresent(keyRow);
+            if (cachedRow != null) {
+                collect(cachedRow);
+                return;
+            }
+        }
+        for (int retry = 0; retry <= maxRetryTimes; retry++) {
+            try {
+                RowData rowData;
+                switch (redisCommand) {
+                    case GET:
+                        rowData = GenericRowData
+                                .of(StringData.fromString(keys[0].toString()), 
StringData
+                                        
.fromString(this.redisCommandsContainer.get(keys[0].toString())));
+                        break;
+                    case HGET:
+                        rowData = GenericRowData
+                                .of(StringData.fromString(keys[0].toString()), 
StringData.fromString(
+                                        
this.redisCommandsContainer.hget(this.additionalKey, keys[0].toString())));
+                        break;
+                    case ZREVRANK:
+                        rowData = GenericRowData
+                                .of(StringData.fromString(keys[0].toString()),
+                                        
this.redisCommandsContainer.zrevrank(this.additionalKey, keys[0].toString()));
+                        break;
+                    case ZSCORE:
+                        rowData = GenericRowData
+                                .of(StringData.fromString(keys[0].toString()),
+                                        
this.redisCommandsContainer.zscore(this.additionalKey, keys[0].toString()));
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                String.format("Unsupported for redisCommand: 
%s", redisCommand));
+                }
+                if (cache == null) {
+                    collect(rowData);
+                } else {
+                    collect(rowData);
+                    cache.put(keyRow, rowData);
+                }
+                break;
+            } catch (Exception e) {
+                LOG.error(String.format("Redis query error, retry times = %d", 
retry), e);
+                if (retry >= maxRetryTimes) {
+                    throw new RuntimeException("Redis query error failed.", e);
+                }
+                try {
+                    Thread.sleep(1000 * retry);
+                } catch (InterruptedException e1) {
+                    throw new RuntimeException(e1);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        try {
+            this.redisCommandsContainer = 
RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+            this.redisCommandsContainer.open();
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : 
CacheBuilder.newBuilder()
+                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                    .maximumSize(cacheMaxSize)
+                    .build();
+        } catch (Exception e) {
+            LOG.error("Redis has not been properly initialized: ", e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (redisCommandsContainer != null) {
+            redisCommandsContainer.close();
+        }
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
new file mode 100644
index 000000000..46d8bf435
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.redis.common.config.RedisLookupOptions;
+import org.apache.inlong.sort.redis.common.config.RedisOptions;
+import org.apache.inlong.sort.redis.common.descriptor.InlongRedisValidator;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+import org.apache.inlong.sort.redis.source.RedisDynamicTableSource;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import static org.apache.flink.util.Preconditions.checkState;
+import static 
org.apache.inlong.sort.redis.common.config.RedisOptions.LOOKUP_ASYNC;
+import static 
org.apache.inlong.sort.redis.common.config.RedisOptions.LOOKUP_CACHE_MAX_ROWS;
+import static 
org.apache.inlong.sort.redis.common.config.RedisOptions.LOOKUP_CACHE_TTL;
+import static 
org.apache.inlong.sort.redis.common.config.RedisOptions.LOOKUP_MAX_RETRIES;
+
+/**
+ * Redis dynamic table factory
+ */
+public class RedisDynamicTableFactory implements DynamicTableSourceFactory {
+
+    /**
+     * The identifier of Redis Connector
+     */
+    public static final String IDENTIFIER = "redis-inlong";
+    /**
+     * Supported redis mode, contains [standalone|cluster|sentinel].
+     */
+    public static final Set<String> SUPPORT_REDIS_MODE = new HashSet<String>() 
{
+        private static final long serialVersionUID = 1L;
+
+        {
+            add(RedisValidator.REDIS_CLUSTER);
+            add(RedisValidator.REDIS_SENTINEL);
+            add(InlongRedisValidator.REDIS_STANDALONE);
+        }
+    };
+    /**
+     * Supported redis source commands, contain [GET|HGET|ZREVRANK|ZSCORE] at 
now.
+     */
+    public static Set<String> SUPPORT_SOURCE_COMMANDS = new HashSet<String>() {
+        private static final long serialVersionUID = 1L;
+
+        {
+            add(RedisCommand.GET.name());
+            add(RedisCommand.HGET.name());
+            add(RedisCommand.ZREVRANK.name());
+            add(RedisCommand.ZSCORE.name());
+        }
+    };
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        ReadableConfig config = helper.getOptions();
+        helper.validate();
+        validateConfigOptions(config, SUPPORT_SOURCE_COMMANDS);
+        return new 
RedisDynamicTableSource(context.getCatalogTable().getOptions(),
+                context.getCatalogTable().getResolvedSchema(), config, 
getJdbcLookupOptions(config));
+    }
+
+    private RedisLookupOptions getJdbcLookupOptions(ReadableConfig 
readableConfig) {
+        return new 
RedisLookupOptions(readableConfig.get(LOOKUP_CACHE_MAX_ROWS),
+                readableConfig.get(LOOKUP_CACHE_TTL).toMillis(),
+                readableConfig.get(LOOKUP_MAX_RETRIES), 
readableConfig.get(LOOKUP_ASYNC));
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(RedisOptions.COMMAND);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(RedisOptions.CLUSTER_NODES);
+        options.add(RedisOptions.DATABASE);
+        options.add(RedisOptions.PASSWORD);
+        options.add(RedisOptions.REDIS_MODE);
+        options.add(RedisOptions.ADDITIONAL_KEY);
+        options.add(RedisOptions.MAXIDLE);
+        options.add(RedisOptions.MINIDLE);
+        options.add(RedisOptions.REDIS_MASTER_NAME);
+        options.add(LOOKUP_ASYNC);
+        options.add(LOOKUP_CACHE_MAX_ROWS);
+        options.add(LOOKUP_CACHE_TTL);
+        options.add(LOOKUP_MAX_RETRIES);
+        options.add(RedisOptions.HOST);
+        options.add(RedisOptions.MAX_TOTAL);
+        options.add(RedisOptions.PORT);
+        options.add(RedisOptions.SENTINELS_INFO);
+        options.add(RedisOptions.SOCKET_TIMEOUT);
+        options.add(RedisOptions.TIMEOUT);
+        return options;
+    }
+
+    private void validateConfigOptions(ReadableConfig config, Set<String> 
supportCommands) {
+        String redisMode = config.get(RedisOptions.REDIS_MODE);
+        List<String> matchRedisMode = SUPPORT_REDIS_MODE.stream().filter(e -> 
e.equals(redisMode.toLowerCase().trim()))
+                .collect(Collectors.toList());
+        checkState(!matchRedisMode.isEmpty(),
+                "Unsupported redis-mode " + redisMode + ". The supported 
redis-mode " + Arrays
+                        .deepToString(SUPPORT_REDIS_MODE.toArray()));
+        String command = config.get(RedisOptions.COMMAND);
+        Preconditions.checkState(!StringUtils.isNullOrWhitespaceOnly(command),
+                "Command can not be empty. The supported command are " + Arrays
+                        .deepToString(supportCommands.toArray()));
+        List<String> matchCommand = supportCommands
+                .stream().filter(e -> 
e.equals(command.toUpperCase().trim())).collect(Collectors.toList());
+        checkState(!matchCommand.isEmpty(), "Unsupported command " + command + 
". The supported command " + Arrays
+                .deepToString(supportCommands.toArray()));
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/SchemaValidator.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/SchemaValidator.java
new file mode 100644
index 000000000..3377e3202
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/SchemaValidator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.redis.table;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.redis.common.mapper.RedisCommand;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Schema validator
+ */
+public class SchemaValidator {
+
+    private final Map<RedisCommand, LogicalTypeRoot[]> schemaMap = new 
HashMap<>();
+
+    /**
+     * Register schema validator
+     *
+     * @param redisCommand The redis command
+     * @param requiredLogicalTypes The requiredLogicalTypes
+     * @return Schema validator
+     */
+    public SchemaValidator register(RedisCommand redisCommand,
+            LogicalTypeRoot[] requiredLogicalTypes) {
+        schemaMap.putIfAbsent(redisCommand, requiredLogicalTypes);
+        return this;
+    }
+
+    /**
+     * Validate the schema
+     *
+     * @param redisCommand The redis command
+     * @param tableSchema The table schema
+     */
+    public void validate(RedisCommand redisCommand, ResolvedSchema 
tableSchema) {
+        LogicalType[] logicalTypes = 
tableSchema.getColumns().stream().filter(Column::isPhysical)
+                .map(s -> 
s.getDataType().getLogicalType()).toArray(LogicalType[]::new);
+        LogicalTypeRoot[] requiredLogicalTypes = schemaMap.get(redisCommand);
+        for (int i = 0; i < requiredLogicalTypes.length; i++) {
+            Preconditions.checkState(requiredLogicalTypes[i] == 
logicalTypes[i].getTypeRoot(),
+                    "Table schema " + Arrays.deepToString(logicalTypes) + " is 
invalid. Table schema "
+                            + Arrays.deepToString(requiredLogicalTypes) + " is 
required for command " + redisCommand
+                            .name());
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
 
b/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
new file mode 100644
index 000000000..ad49bb0b2
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.redis.common.mapper.row.GetMapper
+org.apache.inlong.sort.redis.common.mapper.row.HgetMapper
+org.apache.inlong.sort.redis.common.mapper.row.ZrevrankMapper
+org.apache.inlong.sort.redis.common.mapper.row.ZscoreMapper
+
+org.apache.inlong.sort.redis.common.config.handler.FlinkJedisClusterConfigHandler
+org.apache.inlong.sort.redis.common.config.handler.FlinkJedisSentinelConfigHandler
+org.apache.inlong.sort.redis.common.config.handler.FlinkJedisStandaloneConfigHandler
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..def6538cf
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.redis.table.RedisDynamicTableFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 77f3d5e07..1363253d1 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -517,6 +517,17 @@
   Source  : flink-cdc-connectors 2.2.1 (Please note that the software have 
been modified.)
   License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 
+ 1.3.9 
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
+       
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
+       
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/container/RedisCommandsContainerBuilder.java
+       
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/handler/RedisMapperHandler.java
+       
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommand.java
+       
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisCommandDescription.java
+       
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/RedisMapper.java
+       
inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/common/mapper/row/RowRedisMapper.java
+  Source  : org.apache.bahir:flink-connector-redis_2.11:1.1-SNAPSHOT (Please 
note that the software have been modified.)
+  License : https://github.com/apache/bahir-flink/blob/master/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 
diff --git a/licenses/inlong-sort-connectors/NOTICE 
b/licenses/inlong-sort-connectors/NOTICE
index aff226883..75d6375ee 100644
--- a/licenses/inlong-sort-connectors/NOTICE
+++ b/licenses/inlong-sort-connectors/NOTICE
@@ -2382,6 +2382,14 @@ https://github.com/airlift/airlift/blob/master/LICENSE
 
 
 
+========================================================================
+
+flink-connector-redis
+Copyright 2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
 ========================================================================
 
 Jetty :: Apache JSP Implementation NOTICE
diff --git a/pom.xml b/pom.xml
index 5dfc3ac26..05800d85a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -212,6 +212,7 @@
         <flink.protobuf.version>2.7.6</flink.protobuf.version>
         
<flink.connector.mongodb.cdc.version>2.2.1</flink.connector.mongodb.cdc.version>
         
<flink.connector.oracle.cdc.version>2.2.1</flink.connector.oracle.cdc.version>
+        <flink.connector.redis>1.1.0</flink.connector.redis>
         
<qcloud.flink.cos.fs.hadoop.version>1.10.0-0.1.10</qcloud.flink.cos.fs.hadoop.version>
         <qcloud.chdfs.version>2.5</qcloud.chdfs.version>
 
@@ -315,6 +316,12 @@
                 <version>${flink.connector.mongodb.cdc.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.bahir</groupId>
+                
<artifactId>flink-connector-redis_${flink.scala.binary.version}</artifactId>
+                <version>${flink.connector.redis}</version>
+            </dependency>
+
             <!-- hive -->
             <dependency>
                 <groupId>org.apache.hive</groupId>

Reply via email to