This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f86a478  [BAHIR-220] Add redis descriptor to enable connection as a 
table (#72)
f86a478 is described below

commit f86a47877f3e20be6c3d0fcda189474b93806a8d
Author: hzyuemeng1 <[email protected]>
AuthorDate: Sat Mar 14 08:01:30 2020 +0800

    [BAHIR-220] Add redis descriptor to enable connection as a table (#72)
    
    Add Redis table sink so we can register it to catalog,
    and use Redis as a table in SQL environment
---
 flink-connector-redis/pom.xml                      |  29 ++++
 .../streaming/connectors/redis/RedisSink.java      |  12 ++
 .../streaming/connectors/redis/RedisTableSink.java | 113 +++++++++++++
 .../connectors/redis/RedisTableSinkFactory.java    |  80 +++++++++
 .../handler/FlinkJedisClusterConfigHandler.java    |  61 +++++++
 .../handler/FlinkJedisSentinelConfigHandler.java   |  66 ++++++++
 .../common/container/RedisClusterContainer.java    |  60 +++++++
 .../common/container/RedisCommandsContainer.java   |  31 ++++
 .../redis/common/container/RedisContainer.java     |  74 +++++++++
 .../common/hanlder/FlinkJedisConfigHandler.java    |  35 ++++
 .../redis/common/hanlder/RedisHandler.java         |  46 ++++++
 .../redis/common/hanlder/RedisHandlerServices.java | 182 +++++++++++++++++++++
 .../redis/common/hanlder/RedisMapperHandler.java   |  57 +++++++
 .../redis/common/mapper/RedisCommand.java          |  23 ++-
 .../common/mapper/RedisCommandDescription.java     |  12 ++
 .../redis/common/mapper/row/DecrByExMapper.java    |  36 ++++
 .../redis/common/mapper/row/DecrByMapper.java      |  31 ++++
 .../redis/common/mapper/row/HSetMapper.java        |  32 ++++
 .../redis/common/mapper/row/IncrByExMapper.java    |  36 ++++
 .../redis/common/mapper/row/IncrByMapper.java      |  32 ++++
 .../redis/common/mapper/row/LPushMapper.java       |  32 ++++
 .../redis/common/mapper/row/PfAddMapper.java       |  32 ++++
 .../redis/common/mapper/row/RPushMapper.java       |  32 ++++
 .../redis/common/mapper/row/RowRedisMapper.java    | 109 ++++++++++++
 .../redis/common/mapper/row/SAddMapper.java        |  32 ++++
 .../redis/common/mapper/row/SetExMapper.java       |  36 ++++
 .../redis/common/mapper/row/SetMapper.java         |  32 ++++
 .../redis/common/mapper/row/ZAddMapper.java        |  32 ++++
 .../connectors/redis/descriptor/Redis.java         | 116 +++++++++++++
 .../redis/descriptor/RedisVadidator.java           |  36 ++++
 ...ng.connectors.redis.common.hanlder.RedisHandler |  29 ++++
 .../org.apache.flink.table.factories.TableFactory  |  16 ++
 .../connectors/redis/RedisDescriptorTest.java      |  99 +++++++++++
 .../connectors/redis/common/RedisHandlerTest.java  |  68 ++++++++
 34 files changed, 1748 insertions(+), 1 deletion(-)

diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 46d546b..2f9232a 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -43,6 +43,11 @@ under the License.
             
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>redis.clients</groupId>
@@ -65,6 +70,30 @@ under the License.
             <type>test-jar</type>
         </dependency>
 
+        <!-- 
https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <!-- Either... (for the old planner that was available before Flink 
1.9) -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <!-- Either... -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index e6fb355..4dfa61b 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -174,6 +174,18 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
                 
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), 
key, value,
                         optAdditionalTTL.orElse(this.additionalTTL));
                 break;
+            case INCRBY:
+                this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
+                break;
+            case INCRBY_EX:
+                this.redisCommandsContainer.incrByEx(key, Long.valueOf(value), 
optAdditionalTTL.orElse(this.additionalTTL));
+                break;
+            case DECRBY:
+                this.redisCommandsContainer.decrBy(key, Long.valueOf(value));
+                break;
+            case DESCRBY_EX:
+                this.redisCommandsContainer.decrByEx(key, Long.valueOf(value), 
optAdditionalTTL.orElse(this.additionalTTL));
+                break;
             default:
                 throw new IllegalArgumentException("Cannot process such data 
type: " + redisCommand);
         }
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
new file mode 100644
index 0000000..4947fe0
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * @author Ameng .
+ * redis table sink to use redis in sql env.
+ */
+public class RedisTableSink implements UpsertStreamTableSink<Row> {
+
+    private FlinkJedisConfigBase flinkJedisConfigBase;
+    private RedisMapper redisMapper;
+    private TableSchema tableSchema;
+    private String[] keyFields;
+    private boolean isAppendOnly;
+    private Map<String, String> properties = null;
+
+
+    public RedisTableSink(Map<String, String> properties) {
+        this.properties = properties;
+        Preconditions.checkNotNull(properties, "properties should not be 
null");
+        redisMapper = RedisHandlerServices
+                .findRedisHandler(RedisMapperHandler.class, properties)
+                .createRedisMapper(properties);
+        flinkJedisConfigBase = RedisHandlerServices
+                .findRedisHandler(FlinkJedisConfigHandler.class, properties)
+                .createFlinkJedisConfig(properties);
+        final DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
+        descriptorProperties.putProperties(properties);
+        tableSchema = descriptorProperties.getTableSchema(SCHEMA);
+    }
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
Row>> dataStream) {
+        return dataStream.addSink(new RedisSink(flinkJedisConfigBase, 
redisMapper))
+                .setParallelism(dataStream.getParallelism())
+                .name(TableConnectorUtils.generateRuntimeName(this.getClass(), 
getFieldNames()));
+    }
+
+
+    @Override
+    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+        consumeDataStream(dataStream);
+    }
+
+    @Override
+    public TableSink configure(String[] fieldNames, TypeInformation[] 
fieldTypes) {
+        return new RedisTableSink(getProperties());
+    }
+
+    @Override
+    public TableSchema getTableSchema() {
+        return tableSchema;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public void setKeyFields(String[] keys) {
+        this.keyFields = keys;
+    }
+
+    @Override
+    public void setIsAppendOnly(Boolean isAppendOnly) {
+        this.isAppendOnly = isAppendOnly;
+    }
+
+    @Override
+    public TypeInformation<Row> getRecordType() {
+        return tableSchema.toRowType();
+    }
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
new file mode 100644
index 0000000..2ad5ed3
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_COMMAND;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_KEY_TTL;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MASTER_NAME;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_NODES;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_SENTINEL;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * @author Ameng .
+ * redis table sink factory for creare redis table sink.
+ */
+public class RedisTableSinkFactory implements 
StreamTableSinkFactory<Tuple2<Boolean, Row>> {
+
+    @Override
+    public StreamTableSink<Tuple2<Boolean, Row>> 
createStreamTableSink(Map<String, String> properties) {
+        return new RedisTableSink(properties);
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> require = new HashMap<>();
+        require.put(CONNECTOR_TYPE, REDIS);
+        return require;
+    }
+
+    @Override
+    public List<String> supportedProperties() {
+        List<String> properties = new ArrayList<>();
+        properties.add(REDIS_MODE);
+        properties.add(REDIS_COMMAND);
+        properties.add(REDIS_NODES);
+        properties.add(REDIS_MASTER_NAME);
+        properties.add(REDIS_SENTINEL);
+        properties.add(REDIS_KEY_TTL);
+        // schema
+        properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+        properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+        properties.add(SCHEMA + ".#." + SCHEMA_FROM);
+        // format wildcard
+        properties.add(FORMAT + ".*");
+        properties.add(CONNECTOR + ".*");
+        return properties;
+    }
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
new file mode 100644
index 0000000..b48db1b
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config.handler;
+
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_CLUSTER;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_NODES;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * @author Ameng .
+ * jedis cluster config handler to find and create jedis cluster config use 
meta.
+ */
+public class FlinkJedisClusterConfigHandler implements FlinkJedisConfigHandler 
{
+
+    @Override
+    public FlinkJedisConfigBase createFlinkJedisConfig(Map<String, String> 
properties) {
+        Preconditions.checkArgument(properties.containsKey(REDIS_NODES), 
"nodes should not be null in cluster mode");
+        String nodesInfo = properties.get(REDIS_NODES);
+        Set<InetSocketAddress> nodes = 
Arrays.asList(nodesInfo.split(",")).stream().map(r -> {
+            String[] arr = r.split(":");
+            return new InetSocketAddress(arr[0].trim(), 
Integer.parseInt(arr[1].trim()));
+        }).collect(Collectors.toSet());
+        return new FlinkJedisClusterConfig.Builder()
+                .setNodes(nodes).build();
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> require = new HashMap<>();
+        require.put(REDIS_MODE, REDIS_CLUSTER);
+        return require;
+    }
+
+    public FlinkJedisClusterConfigHandler() {
+    }
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
new file mode 100644
index 0000000..fb852de
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config.handler;
+
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MASTER_NAME;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_SENTINEL;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.SENTINELS_INFO;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.SENTINELS_PASSWORD;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
+
+public class FlinkJedisSentinelConfigHandler implements 
FlinkJedisConfigHandler {
+
+    @Override
+    public FlinkJedisConfigBase createFlinkJedisConfig(Map<String, String> 
properties) {
+        String masterName = properties.computeIfAbsent(REDIS_MASTER_NAME, 
null);
+        String sentinelsInfo = properties.computeIfAbsent(SENTINELS_INFO, 
null);
+        Objects.requireNonNull(masterName, "master should not be null in 
sentinel mode");
+        Objects.requireNonNull(sentinelsInfo, "sentinels should not be null in 
sentinel mode");
+        Set<String> sentinels = Arrays.asList(sentinelsInfo.split(","))
+                .stream().collect(Collectors.toSet());
+        String sentinelsPassword = 
properties.computeIfAbsent(SENTINELS_PASSWORD, null);
+        if (sentinelsPassword != null && sentinelsPassword.trim().isEmpty()) {
+            sentinelsPassword = null;
+        }
+        FlinkJedisSentinelConfig flinkJedisSentinelConfig = new 
FlinkJedisSentinelConfig.Builder()
+                
.setMasterName(masterName).setSentinels(sentinels).setPassword(sentinelsPassword)
+                .build();
+        return flinkJedisSentinelConfig;
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> require = new HashMap<>();
+        require.put(REDIS_MODE, REDIS_SENTINEL);
+        return require;
+    }
+
+    public FlinkJedisSentinelConfigHandler() {
+
+    }
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 886b94f..de0dfaa 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -188,6 +188,66 @@ public class RedisClusterContainer implements 
RedisCommandsContainer, Closeable
         }
     }
 
+    @Override
+    public void incrByEx(String key, Long value, Integer ttl) {
+        try {
+            jedisCluster.incrBy(key, value);
+            if (ttl != null) {
+                jedisCluster.expire(key, ttl);
+            }
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command incrby and 
ttl to key {} with increment {} and tll {} error message {}",
+                        key, value, ttl, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void decrByEx(String key, Long value, Integer ttl) {
+        try {
+            jedisCluster.decrBy(key, value);
+            if (ttl != null) {
+                jedisCluster.expire(key, ttl);
+            }
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command descry and 
ttl to key {} with increment {} and tll {} error message {}",
+                        key, value, ttl, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+
+    @Override
+    public void incrBy(String key, Long value) {
+        try {
+            jedisCluster.incrBy(key, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command incrby to 
key {} with increment {} and tll {} error message {}",
+                        key, value, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void decrBy(String key, Long value) {
+        try {
+            jedisCluster.decrBy(key, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command descry to 
key {} with decrement {} error message {}",
+                        key, value, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+
     /**
      * Closes the {@link JedisCluster}.
      */
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 486784b..8adbd8d 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -127,6 +127,37 @@ public interface RedisCommandsContainer extends 
Serializable {
      */
     void zrem(String key, String element);
 
+
+    /**
+     *  increase value to specified key and expire the key with fixed time.
+     * @param key the key name in which value to be set
+     * @param value the value
+     * @param ttl time to live (TTL)
+     */
+    void incrByEx(String key, Long value, Integer ttl);
+
+    /**
+     * decrease value from specified key and expire the key.
+     * @param key the key name in which value to be set
+     * @param value value the value
+     * @param ttl time to live (TTL)
+     */
+    void decrByEx(String key, Long value, Integer ttl);
+
+    /**
+     *  increase value to specified key.
+     * @param key the key name in which value to be set
+     * @param value the value
+     */
+    void incrBy(String key, Long value);
+
+    /**
+     * decrease value from specified key.
+     * @param key the key name in which value to be set
+     * @param value value the value
+     */
+    void decrBy(String key, Long value);
+
     /**
      * Close the Jedis container.
      *
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 4af84d1..bde54b5 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -286,4 +286,78 @@ public class RedisContainer implements 
RedisCommandsContainer, Closeable {
             LOG.error("Failed to close (return) instance to pool", e);
         }
     }
+
+    @Override
+    public void incrByEx(String key, Long value, Integer ttl) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.incrBy(key, value);
+            if (ttl != null) {
+                jedis.expire(key, ttl);
+            }
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis with incrby command with 
increment {}  with ttl {} error message {}",
+                        key, value, ttl, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void decrByEx(String key, Long value, Integer ttl) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.decrBy(key, value);
+            if (ttl != null) {
+                jedis.expire(key, ttl);
+            }
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis with decrBy command with 
decrement {}  with ttl {} error message {}",
+                        key, value, ttl, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void incrBy(String key, Long value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.incrBy(key, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis with incrby command with 
increment {}  error message {}",
+                        key, value, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void decrBy(String key, Long value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.decrBy(key, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis with decrBy command with 
increment {}  error message {}",
+                        key, value, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
 }
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/FlinkJedisConfigHandler.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/FlinkJedisConfigHandler.java
new file mode 100644
index 0000000..d1969f3
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/FlinkJedisConfigHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.hanlder;
+
+import java.util.Map;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+
+/**
+ * @author Ameng .
+ * handler to create flink jedis config.
+ */
+public interface FlinkJedisConfigHandler extends RedisHandler  {
+
+    /**
+     * create flink jedis config use sepecified properties.
+     * @param properties used to create flink jedis config
+     * @return flink jedis config
+     */
+    FlinkJedisConfigBase createFlinkJedisConfig(Map<String, String> 
properties);
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandler.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandler.java
new file mode 100644
index 0000000..eb33782
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.hanlder;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/***
+ * @author Ameng.
+ * redis handler to create redis mapper and flink jedis config.
+ */
+public interface RedisHandler extends Serializable {
+
+    /**
+     * require context for spi to find this redis handler.
+     * @return properties to find correct redis handler.
+     */
+    Map<String, String> requiredContext();
+
+    /**
+     * suppport properties used for this redis handler.
+     * @return support properties list
+     * @throws Exception
+     */
+    default List<String> supportProperties() throws Exception {
+        return Collections.emptyList();
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandlerServices.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandlerServices.java
new file mode 100644
index 0000000..ba8ff71
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandlerServices.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.hanlder;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Ameng.
+ * Unified class to search for a {@link RedisHandler} of provided type and 
properties.
+ * for find correct redis handler.
+ * @param <T> redis handler type.
+ */
+public class RedisHandlerServices<T> {
+
+    private static final ServiceLoader<RedisHandler> defaultLoader = 
ServiceLoader.load(RedisHandler.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(RedisHandlerServices.class);
+
+    /**
+     * use specified class and properties to find redis handler.
+     * @param RedisHanlderClass specified redis handler class.
+     * @param meta properties to search redis handler
+     * @param <T>
+     * @return
+     */
+    public static <T extends RedisHandler> T findRedisHandler(Class<T> 
RedisHanlderClass, Map<String, String> meta) {
+        Preconditions.checkNotNull(meta);
+        return findSingRedisHandler(RedisHanlderClass, meta, Optional.empty());
+    }
+
+
+    /**
+     * use specified class and properties and class loader to find redis 
handler.
+     * @param RedisHanlderClass specified redis handler class.
+     * @param meta properties to search redis handler
+     * @param classLoader class loader to load redis handler class
+     * @param <T> redis handler
+     * @return matched redis handler
+     */
+    private static <T extends RedisHandler> T findSingRedisHandler(
+            Class<T> RedisHanlderClass,
+            Map<String, String> meta,
+            Optional<ClassLoader> classLoader) {
+
+        List<RedisHandler> redisHandlers = discoverRedisHanlder(classLoader);
+        List<T> filtered = filter(redisHandlers, RedisHanlderClass, meta);
+
+        return filtered.get(0);
+    }
+
+
+    /**
+     * Filters found redis by factory class and with matching context.
+     */
+    private static <T extends RedisHandler> List<T> filter(
+            List<RedisHandler> redis,
+            Class<T> redisClass,
+            Map<String, String> meta) {
+
+        Preconditions.checkNotNull(redisClass);
+        Preconditions.checkNotNull(meta);
+
+        List<T> redisFactories = filterByFactoryClass(
+                redisClass,
+                redis);
+
+        List<T> contextFactories = filterByContext(
+                meta,
+                redisFactories);
+        return contextFactories;
+    }
+
+    /**
+     * Searches for redis using Java service providers.
+     *
+     * @return all redis in the classpath
+     */
+    private static List<RedisHandler> 
discoverRedisHanlder(Optional<ClassLoader> classLoader) {
+        try {
+            List<RedisHandler> result = new LinkedList<>();
+            if (classLoader.isPresent()) {
+                ServiceLoader
+                        .load(RedisHandler.class, classLoader.get())
+                        .iterator()
+                        .forEachRemaining(result::add);
+            } else {
+                defaultLoader.iterator().forEachRemaining(result::add);
+            }
+            return result;
+        } catch (ServiceConfigurationError e) {
+            LOG.error("Could not load service provider for redis handler.", e);
+            throw new TableException("Could not load service provider for 
redis handler.", e);
+        }
+
+    }
+
+    /**
+     * Filters factories with matching context by factory class.
+     */
+    @SuppressWarnings("unchecked")
+    private static <T> List<T> filterByFactoryClass(
+            Class<T> redisClass,
+            List<RedisHandler> redis) {
+
+        List<RedisHandler> redisList = redis.stream()
+                .filter(p -> redisClass.isAssignableFrom(p.getClass()))
+                .collect(Collectors.toList());
+
+        if (redisList.isEmpty()) {
+            throw new RuntimeException(
+                    String.format("No redis hanlder implements '%s'.", 
redisClass.getCanonicalName()));
+        }
+
+        return (List<T>) redisList;
+    }
+
+    /**
+     * Filters for factories with matching context.
+     *
+     * @return all matching factories
+     */
+    private static <T extends RedisHandler> List<T> filterByContext(
+            Map<String, String> meta,
+            List<T> redisList) {
+
+        List<T> matchingredis = redisList.stream().filter(factory -> {
+            Map<String, String> requestedContext = normalizeContext(factory);
+
+            Map<String, String> plainContext = new HashMap<>(requestedContext);
+
+            // check if required context is met
+            return plainContext.keySet()
+                    .stream()
+                    .allMatch(e -> meta.containsKey(e) && 
meta.get(e).equals(plainContext.get(e)));
+        }).collect(Collectors.toList());
+
+        if (matchingredis.isEmpty()) {
+            throw new RuntimeException("no match redis");
+        }
+
+        return matchingredis;
+    }
+
+    /**
+     * Prepares the properties of a context to be used for match operations.
+     */
+    private static Map<String, String> normalizeContext(RedisHandler redis) {
+        Map<String, String> requiredContext = redis.requiredContext();
+        if (requiredContext == null) {
+            throw new RuntimeException(
+                    String.format("Required context of redis '%s' must not be 
null.", redis.getClass().getName()));
+        }
+        return requiredContext.keySet().stream()
+                .collect(Collectors.toMap(String::toLowerCase, 
requiredContext::get));
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
new file mode 100644
index 0000000..a0b087c
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.hanlder;
+
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_KEY_TTL;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Ameng .
+ * handler for create redis mapper.
+ */
+public interface RedisMapperHandler extends RedisHandler {
+
+    Logger LOGGER = LoggerFactory.getLogger(RedisMapperHandler.class);
+
+    /**
+     * create a correct redis mapper use properties.
+     * @param properties to create redis mapper.
+     * @return redis mapper.
+     */
+    default RedisMapper createRedisMapper(Map<String, String> properties) {
+        String ttl = properties.get(REDIS_KEY_TTL);
+        try {
+            Class redisMapper = 
Class.forName(this.getClass().getCanonicalName());
+
+            if (ttl == null) {
+                return (RedisMapper) redisMapper.newInstance();
+            }
+            Constructor c = redisMapper.getConstructor(Integer.class);
+            return (RedisMapper) c.newInstance(Integer.parseInt(ttl));
+        } catch (Exception e) {
+            LOGGER.error("create redis mapper failed", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index d465e83..282d15a 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -75,7 +75,28 @@ public enum RedisCommand {
      * Sets field in the hash stored at key to value. If key does not exist,
      * a new key holding a hash is created. If field already exists in the 
hash, it is overwritten.
      */
-    HSET(RedisDataType.HASH);
+    HSET(RedisDataType.HASH),
+
+    /**
+     * Delta plus for specified key.
+     */
+    INCRBY(RedisDataType.STRING),
+
+    /**
+     * Delta plus for specified key and expire the key with fixed time.
+     */
+    INCRBY_EX(RedisDataType.STRING),
+
+    /**
+     * decrease with fixed num for specified key.
+     */
+    DECRBY(RedisDataType.STRING),
+
+    /**
+     * decrease with fixed num for specified key and expire the key with fixed 
time.
+     */
+    DESCRBY_EX(RedisDataType.STRING);
+
 
     /**
      * The {@link RedisDataType} this command belongs to.
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
index 3284361..ba1d8b7 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -83,6 +83,18 @@ public class RedisCommandDescription implements Serializable 
{
                 throw new IllegalArgumentException("SETEX command should have 
time to live (TTL)");
             }
         }
+
+        if (redisCommand.equals(RedisCommand.INCRBY_EX)) {
+            if (additionalTTL == null) {
+                throw new IllegalArgumentException("INCRBY_EX command should 
have time to live (TTL)");
+            }
+        }
+
+        if (redisCommand.equals(RedisCommand.DESCRBY_EX)) {
+            if (additionalTTL == null) {
+                throw new IllegalArgumentException("INCRBY_EX command should 
have time to live (TTL)");
+            }
+        }
     }
 
     /**
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByExMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByExMapper.java
new file mode 100644
index 0000000..0844079
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByExMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * decrease with expire operation redis mapper.
+ */
+public class DecrByExMapper extends RowRedisMapper {
+
+    public DecrByExMapper() {
+        super(RedisCommand.DESCRBY_EX);
+    }
+
+    public DecrByExMapper(Integer ttl) {
+        super(ttl, RedisCommand.DESCRBY_EX);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByMapper.java
new file mode 100644
index 0000000..071eed4
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByMapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * decrease operation redis mapper.
+ */
+public class DecrByMapper extends RowRedisMapper {
+
+    public DecrByMapper() {
+        super(RedisCommand.DECRBY);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HSetMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HSetMapper.java
new file mode 100644
index 0000000..1cea53f
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HSetMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * HSET operation redis mapper.
+ */
+public class HSetMapper extends RowRedisMapper {
+
+    public HSetMapper() {
+        super(RedisCommand.HSET);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByExMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByExMapper.java
new file mode 100644
index 0000000..0621334
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByExMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * Delta plus with expire key operation redis mapper.
+ */
+public class IncrByExMapper extends RowRedisMapper {
+
+    public IncrByExMapper() {
+        super(RedisCommand.INCRBY_EX);
+    }
+
+    public IncrByExMapper(Integer ttl) {
+        super(ttl, RedisCommand.INCRBY_EX);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByMapper.java
new file mode 100644
index 0000000..b0bd567
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * Delta plus operation
+ */
+public class IncrByMapper extends RowRedisMapper {
+
+    public IncrByMapper() {
+        super(RedisCommand.INCRBY);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/LPushMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/LPushMapper.java
new file mode 100644
index 0000000..0f5e39c
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/LPushMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * LPUSH operation redis mapper.
+ */
+public class LPushMapper extends RowRedisMapper {
+
+    public LPushMapper() {
+        super(RedisCommand.LPUSH);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/PfAddMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/PfAddMapper.java
new file mode 100644
index 0000000..7596520
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/PfAddMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * PFADD operation redis mapper.
+ */
+public class PfAddMapper extends RowRedisMapper {
+
+    public PfAddMapper() {
+        super(RedisCommand.PFADD);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RPushMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RPushMapper.java
new file mode 100644
index 0000000..b0d1a6d
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RPushMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * RPUSH  operation redis mapper.
+ */
+public class RPushMapper extends RowRedisMapper {
+
+    public RPushMapper() {
+        super(RedisCommand.RPUSH);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
new file mode 100644
index 0000000..fe294b8
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_COMMAND;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Ameng .
+ * base row redis mapper implement.
+ */
+public abstract class RowRedisMapper implements RedisMapper<Tuple2<Boolean, 
Row>>, RedisMapperHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RowRedisMapper.class);
+
+    private Integer ttl;
+
+    private RedisCommand redisCommand;
+
+    public int getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(int ttl) {
+        this.ttl = ttl;
+    }
+
+    public RedisCommand getRedisCommand() {
+        return redisCommand;
+    }
+
+    public void setRedisCommand(RedisCommand redisCommand) {
+        this.redisCommand = redisCommand;
+    }
+
+    public RowRedisMapper() {
+    }
+
+    public RowRedisMapper(int ttl, RedisCommand redisCommand) {
+        this.ttl = ttl;
+        this.redisCommand = redisCommand;
+    }
+
+    public RowRedisMapper(RedisCommand redisCommand) {
+        this.redisCommand = redisCommand;
+    }
+
+    @Override
+    public RedisCommandDescription getCommandDescription() {
+        if (ttl != null) {
+            return new RedisCommandDescription(redisCommand, ttl);
+        }
+        return new RedisCommandDescription(redisCommand);
+    }
+
+    @Override
+    public String getKeyFromData(Tuple2<Boolean, Row> data) {
+        return data.f1.getField(0).toString();
+    }
+
+    @Override
+    public String getValueFromData(Tuple2<Boolean, Row> data) {
+        return data.f1.getField(1).toString();
+    }
+
+    @Override
+    public Map<String, String> requiredContext() {
+        Map<String, String> require = new HashMap<>();
+        require.put(REDIS_COMMAND, getRedisCommand().name());
+        return require;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        RedisCommand redisCommand = ((RowRedisMapper) obj).redisCommand;
+        return this.redisCommand == redisCommand;
+    }
+
+    @Override
+    public Optional<Integer> getAdditionalTTL(Tuple2<Boolean, Row> data) {
+        return Optional.ofNullable(getTtl());
+    }
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SAddMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SAddMapper.java
new file mode 100644
index 0000000..cee1347
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SAddMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * SADD  operation redis mapper.
+ */
+public class SAddMapper extends RowRedisMapper {
+
+    public SAddMapper() {
+        super(RedisCommand.SADD);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetExMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetExMapper.java
new file mode 100644
index 0000000..a0f8bbb
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetExMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng.
+ * SET with expire key operation redis mapper.
+ */
+public class SetExMapper extends RowRedisMapper {
+
+    public SetExMapper() {
+        super(RedisCommand.SETEX);
+    }
+
+    public SetExMapper(Integer ttl) {
+        super(ttl, RedisCommand.SETEX);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetMapper.java
new file mode 100644
index 0000000..9e768c1
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * SET operation redis mapper.
+ */
+public class SetMapper extends RowRedisMapper {
+
+    public SetMapper() {
+        super(RedisCommand.SET);
+    }
+
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java
new file mode 100644
index 0000000..3e5b7ed
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * ZADD operation redis mapper.
+ */
+public class ZAddMapper extends RowRedisMapper {
+
+    public ZAddMapper() {
+        super(RedisCommand.ZADD);
+    }
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
new file mode 100644
index 0000000..af21163
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.descriptor;
+
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_CLUSTER;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_COMMAND;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_KEY_TTL;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MASTER_NAME;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_NODES;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_SENTINEL;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * @author Ameng .
+ * redis descriptor for create redis connector.
+ */
+public class Redis extends ConnectorDescriptor {
+
+    Map<String, String> properties = new HashMap<>();
+
+    private String mode = null;
+    private String redisCommand = null;
+    private Integer ttl;
+
+    public Redis(String type, int version, boolean formatNeeded) {
+        super(REDIS, version, formatNeeded);
+    }
+
+    public Redis() {
+        this(REDIS, 1, false);
+    }
+
+    /**
+     * redis operation type.
+     * @param redisCommand redis operation type
+     * @return this descriptor.
+     */
+    public Redis command(String redisCommand) {
+        this.redisCommand = redisCommand;
+            properties.put(REDIS_COMMAND, redisCommand);
+        return this;
+    }
+
+    /**
+     * ttl for specified key.
+     * @param ttl time for key.
+     * @returnthis descriptor
+     */
+    public Redis ttl(Integer ttl) {
+        this.ttl = ttl;
+        properties.put(REDIS_KEY_TTL, String.valueOf(ttl));
+        return this;
+    }
+
+    /**
+     * redis mode to connect a specified redis cluster
+     * @param mode redis mode
+     * @return this descriptor
+     */
+    public Redis mode(String mode) {
+        this.mode = mode;
+        properties.put(REDIS_MODE, mode);
+        return this;
+    }
+
+    /**
+     * add properties used to connect to redis.
+     * @param k specified key
+     * @param v value for specified key
+     * @return this descriptor
+     */
+    public Redis property(String k, String v) {
+        properties.put(k, v);
+        return this;
+    }
+
+    @Override
+    protected Map<String, String> toConnectorProperties() {
+        validate();
+        return properties;
+    }
+
+    /**
+     * validate the necessary properties for redis descriptor.
+     */
+    public void validate() {
+        Preconditions.checkArgument(properties.containsKey(REDIS_COMMAND), 
"need specified redis command");
+        if (mode.equalsIgnoreCase(REDIS_CLUSTER)) {
+            Preconditions.checkArgument(properties.containsKey(REDIS_NODES), 
"cluster mode need cluster-nodes info");
+        } else if (mode.equalsIgnoreCase(REDIS_SENTINEL)) {
+            
Preconditions.checkArgument(properties.containsKey(REDIS_MASTER_NAME), 
"sentinel mode need master name");
+            
Preconditions.checkArgument(properties.containsKey(REDIS_SENTINEL), "sentinel 
mode need sentinel infos");
+        }
+    }
+}
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisVadidator.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisVadidator.java
new file mode 100644
index 0000000..f9b604d
--- /dev/null
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisVadidator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.descriptor;
+
+/**
+ * @author Ameng .
+ * redis validator for validate redis descriptor.
+ */
+public class RedisVadidator {
+    public static final String REDIS = "redis";
+    public static final String REDIS_MODE = "redis-mode";
+    public static final String REDIS_NODES = "cluster-nodes";
+    public static final String REDIS_CLUSTER = "cluster";
+    public static final String REDIS_SENTINEL = "sentinel";
+    public static final String REDIS_COMMAND = "command";
+    public static final String REDIS_MASTER_NAME = "master.name";
+    public static final String SENTINELS_INFO = "sentinels.info";
+    public static final String SENTINELS_PASSWORD = "sentinels.password";
+    public static final String REDIS_KEY_TTL = "key.ttl";
+
+}
diff --git 
a/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
 
b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
new file mode 100644
index 0000000..7964e86
--- /dev/null
+++ 
b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.redis.common.mapper.row.SetExMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.DecrByMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.DecrByExMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.HSetMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.IncrByMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.IncrByExMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.LPushMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.PfAddMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.RPushMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.SAddMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.SetMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.ZAddMapper
+org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisClusterConfigHandler
+org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisSentinelConfigHandler
\ No newline at end of file
diff --git 
a/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..fb97767
--- /dev/null
+++ 
b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.redis.RedisTableSinkFactory
\ No newline at end of file
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
new file mode 100644
index 0000000..2caf8f3
--- /dev/null
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.descriptor.Redis;
+import org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.types.Row;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RedisDescriptorTest extends  RedisITCaseBase{
+
+    private static final String REDIS_KEY = "TEST_KEY";
+
+    StreamExecutionEnvironment env;
+
+    @Before
+    public void setUp(){
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    @Test
+    public void testRedisDescriptor() throws Exception {
+        DataStreamSource<Row> source = (DataStreamSource<Row>) 
env.addSource(new TestSourceFunctionString())
+                .returns(new RowTypeInfo(TypeInformation.of(String.class), 
TypeInformation.of(Long.class)));
+
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useOldPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, settings);
+        tableEnvironment.registerDataStream("t1", source, "k, v");
+
+        Redis redis = new Redis()
+                .mode(RedisVadidator.REDIS_CLUSTER)
+                .command(RedisCommand.INCRBY_EX.name())
+                .ttl(100000)
+                .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" + 
REDIS_PORT);
+
+        tableEnvironment
+                .connect(redis).withSchema(new Schema()
+                .field("k", TypeInformation.of(String.class))
+                .field("v", TypeInformation.of(Long.class)))
+                .registerTableSink("redis");
+
+
+        tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
+        env.execute("Test Redis Table");
+    }
+
+
+    private static class TestSourceFunctionString implements 
SourceFunction<Row> {
+        private static final long serialVersionUID = 1L;
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Row> ctx) throws Exception {
+            while (running) {
+                Row row = new Row(2);
+                row.setField(0, REDIS_KEY);
+                row.setField(1, 2L);
+                ctx.collect(row);
+                Thread.sleep(2000L);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+}
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
new file mode 100644
index 0000000..54a7fd9
--- /dev/null
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common;
+
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_CLUSTER;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_COMMAND;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_KEY_TTL;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_NODES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices;
+import 
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.row.SetExMapper;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RedisHandlerTest extends AbstractTestBase {
+    public static final Map<String, String> properties = new HashMap<>();
+
+    @BeforeClass
+    public static void setUp() {
+        properties.put(REDIS_MODE, REDIS_CLUSTER);
+        properties.put(REDIS_COMMAND, RedisCommand.SETEX.name());
+        properties.put(REDIS_NODES, "localhost:8080");
+        properties.put(REDIS_KEY_TTL, "1000");
+    }
+
+    @Test
+    public void testRedisMapper() {
+        RedisMapper redisMapper = 
RedisHandlerServices.findRedisHandler(RedisMapperHandler.class, properties)
+                .createRedisMapper(properties);
+        SetExMapper expectedMapper = new SetExMapper(1000);
+        assertEquals(redisMapper, expectedMapper);
+    }
+
+    @Test
+    public void testFlinkJedisConfigHandler() {
+        FlinkJedisConfigBase flinkJedisConfigBase =  RedisHandlerServices
+                .findRedisHandler(FlinkJedisConfigHandler.class, properties)
+                .createFlinkJedisConfig(properties);
+        assertTrue(flinkJedisConfigBase instanceof FlinkJedisClusterConfig);
+    }
+}

Reply via email to