This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new f86a478 [BAHIR-220] Add redis descriptor to enable connection as a
table (#72)
f86a478 is described below
commit f86a47877f3e20be6c3d0fcda189474b93806a8d
Author: hzyuemeng1 <[email protected]>
AuthorDate: Sat Mar 14 08:01:30 2020 +0800
[BAHIR-220] Add redis descriptor to enable connection as a table (#72)
Add Redis table sink so we can register it to catalog,
and use Redis as a table in SQL environment
---
flink-connector-redis/pom.xml | 29 ++++
.../streaming/connectors/redis/RedisSink.java | 12 ++
.../streaming/connectors/redis/RedisTableSink.java | 113 +++++++++++++
.../connectors/redis/RedisTableSinkFactory.java | 80 +++++++++
.../handler/FlinkJedisClusterConfigHandler.java | 61 +++++++
.../handler/FlinkJedisSentinelConfigHandler.java | 66 ++++++++
.../common/container/RedisClusterContainer.java | 60 +++++++
.../common/container/RedisCommandsContainer.java | 31 ++++
.../redis/common/container/RedisContainer.java | 74 +++++++++
.../common/hanlder/FlinkJedisConfigHandler.java | 35 ++++
.../redis/common/hanlder/RedisHandler.java | 46 ++++++
.../redis/common/hanlder/RedisHandlerServices.java | 182 +++++++++++++++++++++
.../redis/common/hanlder/RedisMapperHandler.java | 57 +++++++
.../redis/common/mapper/RedisCommand.java | 23 ++-
.../common/mapper/RedisCommandDescription.java | 12 ++
.../redis/common/mapper/row/DecrByExMapper.java | 36 ++++
.../redis/common/mapper/row/DecrByMapper.java | 31 ++++
.../redis/common/mapper/row/HSetMapper.java | 32 ++++
.../redis/common/mapper/row/IncrByExMapper.java | 36 ++++
.../redis/common/mapper/row/IncrByMapper.java | 32 ++++
.../redis/common/mapper/row/LPushMapper.java | 32 ++++
.../redis/common/mapper/row/PfAddMapper.java | 32 ++++
.../redis/common/mapper/row/RPushMapper.java | 32 ++++
.../redis/common/mapper/row/RowRedisMapper.java | 109 ++++++++++++
.../redis/common/mapper/row/SAddMapper.java | 32 ++++
.../redis/common/mapper/row/SetExMapper.java | 36 ++++
.../redis/common/mapper/row/SetMapper.java | 32 ++++
.../redis/common/mapper/row/ZAddMapper.java | 32 ++++
.../connectors/redis/descriptor/Redis.java | 116 +++++++++++++
.../redis/descriptor/RedisVadidator.java | 36 ++++
...ng.connectors.redis.common.hanlder.RedisHandler | 29 ++++
.../org.apache.flink.table.factories.TableFactory | 16 ++
.../connectors/redis/RedisDescriptorTest.java | 99 +++++++++++
.../connectors/redis/common/RedisHandlerTest.java | 68 ++++++++
34 files changed, 1748 insertions(+), 1 deletion(-)
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 46d546b..2f9232a 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -43,6 +43,11 @@ under the License.
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
<dependency>
<groupId>redis.clients</groupId>
@@ -65,6 +70,30 @@ under the License.
<type>test-jar</type>
</dependency>
+ <!--
https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <!-- Either... (for the old planner that was available before Flink
1.9) -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <!-- Either... -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index e6fb355..4dfa61b 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -174,6 +174,18 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey),
key, value,
optAdditionalTTL.orElse(this.additionalTTL));
break;
+ case INCRBY:
+ this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
+ break;
+ case INCRBY_EX:
+ this.redisCommandsContainer.incrByEx(key, Long.valueOf(value),
optAdditionalTTL.orElse(this.additionalTTL));
+ break;
+ case DECRBY:
+ this.redisCommandsContainer.decrBy(key, Long.valueOf(value));
+ break;
+ case DESCRBY_EX:
+ this.redisCommandsContainer.decrByEx(key, Long.valueOf(value),
optAdditionalTTL.orElse(this.additionalTTL));
+ break;
default:
throw new IllegalArgumentException("Cannot process such data
type: " + redisCommand);
}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
new file mode 100644
index 0000000..4947fe0
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * @author Ameng .
+ * redis table sink to use redis in sql env.
+ */
+public class RedisTableSink implements UpsertStreamTableSink<Row> {
+
+ private FlinkJedisConfigBase flinkJedisConfigBase;
+ private RedisMapper redisMapper;
+ private TableSchema tableSchema;
+ private String[] keyFields;
+ private boolean isAppendOnly;
+ private Map<String, String> properties = null;
+
+
+ public RedisTableSink(Map<String, String> properties) {
+ this.properties = properties;
+ Preconditions.checkNotNull(properties, "properties should not be
null");
+ redisMapper = RedisHandlerServices
+ .findRedisHandler(RedisMapperHandler.class, properties)
+ .createRedisMapper(properties);
+ flinkJedisConfigBase = RedisHandlerServices
+ .findRedisHandler(FlinkJedisConfigHandler.class, properties)
+ .createFlinkJedisConfig(properties);
+ final DescriptorProperties descriptorProperties = new
DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+ tableSchema = descriptorProperties.getTableSchema(SCHEMA);
+ }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean,
Row>> dataStream) {
+ return dataStream.addSink(new RedisSink(flinkJedisConfigBase,
redisMapper))
+ .setParallelism(dataStream.getParallelism())
+ .name(TableConnectorUtils.generateRuntimeName(this.getClass(),
getFieldNames()));
+ }
+
+
+ @Override
+ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+ consumeDataStream(dataStream);
+ }
+
+ @Override
+ public TableSink configure(String[] fieldNames, TypeInformation[]
fieldTypes) {
+ return new RedisTableSink(getProperties());
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public void setKeyFields(String[] keys) {
+ this.keyFields = keys;
+ }
+
+ @Override
+ public void setIsAppendOnly(Boolean isAppendOnly) {
+ this.isAppendOnly = isAppendOnly;
+ }
+
+ @Override
+ public TypeInformation<Row> getRecordType() {
+ return tableSchema.toRowType();
+ }
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
new file mode 100644
index 0000000..2ad5ed3
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_COMMAND;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_KEY_TTL;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MASTER_NAME;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_NODES;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_SENTINEL;
+import static
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR;
+import static
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * @author Ameng .
+ * redis table sink factory for creare redis table sink.
+ */
+public class RedisTableSinkFactory implements
StreamTableSinkFactory<Tuple2<Boolean, Row>> {
+
+ @Override
+ public StreamTableSink<Tuple2<Boolean, Row>>
createStreamTableSink(Map<String, String> properties) {
+ return new RedisTableSink(properties);
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> require = new HashMap<>();
+ require.put(CONNECTOR_TYPE, REDIS);
+ return require;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+ properties.add(REDIS_MODE);
+ properties.add(REDIS_COMMAND);
+ properties.add(REDIS_NODES);
+ properties.add(REDIS_MASTER_NAME);
+ properties.add(REDIS_SENTINEL);
+ properties.add(REDIS_KEY_TTL);
+ // schema
+ properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+ properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+ properties.add(SCHEMA + ".#." + SCHEMA_FROM);
+ // format wildcard
+ properties.add(FORMAT + ".*");
+ properties.add(CONNECTOR + ".*");
+ return properties;
+ }
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
new file mode 100644
index 0000000..b48db1b
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config.handler;
+
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_CLUSTER;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_NODES;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * @author Ameng .
+ * jedis cluster config handler to find and create jedis cluster config use
meta.
+ */
+public class FlinkJedisClusterConfigHandler implements FlinkJedisConfigHandler
{
+
+ @Override
+ public FlinkJedisConfigBase createFlinkJedisConfig(Map<String, String>
properties) {
+ Preconditions.checkArgument(properties.containsKey(REDIS_NODES),
"nodes should not be null in cluster mode");
+ String nodesInfo = properties.get(REDIS_NODES);
+ Set<InetSocketAddress> nodes =
Arrays.asList(nodesInfo.split(",")).stream().map(r -> {
+ String[] arr = r.split(":");
+ return new InetSocketAddress(arr[0].trim(),
Integer.parseInt(arr[1].trim()));
+ }).collect(Collectors.toSet());
+ return new FlinkJedisClusterConfig.Builder()
+ .setNodes(nodes).build();
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> require = new HashMap<>();
+ require.put(REDIS_MODE, REDIS_CLUSTER);
+ return require;
+ }
+
+ public FlinkJedisClusterConfigHandler() {
+ }
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
new file mode 100644
index 0000000..fb852de
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisSentinelConfigHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config.handler;
+
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MASTER_NAME;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_SENTINEL;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.SENTINELS_INFO;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.SENTINELS_PASSWORD;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
+
+public class FlinkJedisSentinelConfigHandler implements
FlinkJedisConfigHandler {
+
+ @Override
+ public FlinkJedisConfigBase createFlinkJedisConfig(Map<String, String>
properties) {
+ String masterName = properties.computeIfAbsent(REDIS_MASTER_NAME,
null);
+ String sentinelsInfo = properties.computeIfAbsent(SENTINELS_INFO,
null);
+ Objects.requireNonNull(masterName, "master should not be null in
sentinel mode");
+ Objects.requireNonNull(sentinelsInfo, "sentinels should not be null in
sentinel mode");
+ Set<String> sentinels = Arrays.asList(sentinelsInfo.split(","))
+ .stream().collect(Collectors.toSet());
+ String sentinelsPassword =
properties.computeIfAbsent(SENTINELS_PASSWORD, null);
+ if (sentinelsPassword != null && sentinelsPassword.trim().isEmpty()) {
+ sentinelsPassword = null;
+ }
+ FlinkJedisSentinelConfig flinkJedisSentinelConfig = new
FlinkJedisSentinelConfig.Builder()
+
.setMasterName(masterName).setSentinels(sentinels).setPassword(sentinelsPassword)
+ .build();
+ return flinkJedisSentinelConfig;
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> require = new HashMap<>();
+ require.put(REDIS_MODE, REDIS_SENTINEL);
+ return require;
+ }
+
+ public FlinkJedisSentinelConfigHandler() {
+
+ }
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 886b94f..de0dfaa 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -188,6 +188,66 @@ public class RedisClusterContainer implements
RedisCommandsContainer, Closeable
}
}
+ @Override
+ public void incrByEx(String key, Long value, Integer ttl) {
+ try {
+ jedisCluster.incrBy(key, value);
+ if (ttl != null) {
+ jedisCluster.expire(key, ttl);
+ }
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command incrby and
ttl to key {} with increment {} and tll {} error message {}",
+ key, value, ttl, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void decrByEx(String key, Long value, Integer ttl) {
+ try {
+ jedisCluster.decrBy(key, value);
+ if (ttl != null) {
+ jedisCluster.expire(key, ttl);
+ }
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command descry and
ttl to key {} with increment {} and tll {} error message {}",
+ key, value, ttl, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+
+ @Override
+ public void incrBy(String key, Long value) {
+ try {
+ jedisCluster.incrBy(key, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command incrby to
key {} with increment {} and tll {} error message {}",
+ key, value, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void decrBy(String key, Long value) {
+ try {
+ jedisCluster.decrBy(key, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command descry to
key {} with decrement {} error message {}",
+ key, value, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+
/**
* Closes the {@link JedisCluster}.
*/
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 486784b..8adbd8d 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -127,6 +127,37 @@ public interface RedisCommandsContainer extends
Serializable {
*/
void zrem(String key, String element);
+
+ /**
+ * increase value to specified key and expire the key with fixed time.
+ * @param key the key name in which value to be set
+ * @param value the value
+ * @param ttl time to live (TTL)
+ */
+ void incrByEx(String key, Long value, Integer ttl);
+
+ /**
+ * decrease value from specified key and expire the key.
+ * @param key the key name in which value to be set
+ * @param value value the value
+ * @param ttl time to live (TTL)
+ */
+ void decrByEx(String key, Long value, Integer ttl);
+
+ /**
+ * increase value to specified key.
+ * @param key the key name in which value to be set
+ * @param value the value
+ */
+ void incrBy(String key, Long value);
+
+ /**
+ * decrease value from specified key.
+ * @param key the key name in which value to be set
+ * @param value value the value
+ */
+ void decrBy(String key, Long value);
+
/**
* Close the Jedis container.
*
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 4af84d1..bde54b5 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -286,4 +286,78 @@ public class RedisContainer implements
RedisCommandsContainer, Closeable {
LOG.error("Failed to close (return) instance to pool", e);
}
}
+
+ @Override
+ public void incrByEx(String key, Long value, Integer ttl) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.incrBy(key, value);
+ if (ttl != null) {
+ jedis.expire(key, ttl);
+ }
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis with incrby command with
increment {} with ttl {} error message {}",
+ key, value, ttl, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void decrByEx(String key, Long value, Integer ttl) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.decrBy(key, value);
+ if (ttl != null) {
+ jedis.expire(key, ttl);
+ }
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis with decrBy command with
decrement {} with ttl {} error message {}",
+ key, value, ttl, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void incrBy(String key, Long value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.incrBy(key, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis with incrby command with
increment {} error message {}",
+ key, value, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void decrBy(String key, Long value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.decrBy(key, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis with decrBy command with
increment {} error message {}",
+ key, value, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/FlinkJedisConfigHandler.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/FlinkJedisConfigHandler.java
new file mode 100644
index 0000000..d1969f3
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/FlinkJedisConfigHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.hanlder;
+
+import java.util.Map;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+
+/**
+ * @author Ameng .
+ * handler to create flink jedis config.
+ */
+public interface FlinkJedisConfigHandler extends RedisHandler {
+
+ /**
+ * create flink jedis config use sepecified properties.
+ * @param properties used to create flink jedis config
+ * @return flink jedis config
+ */
+ FlinkJedisConfigBase createFlinkJedisConfig(Map<String, String>
properties);
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandler.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandler.java
new file mode 100644
index 0000000..eb33782
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.hanlder;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/***
+ * @author Ameng.
+ * redis handler to create redis mapper and flink jedis config.
+ */
+public interface RedisHandler extends Serializable {
+
+ /**
+ * require context for spi to find this redis handler.
+ * @return properties to find correct redis handler.
+ */
+ Map<String, String> requiredContext();
+
+ /**
+ * suppport properties used for this redis handler.
+ * @return support properties list
+ * @throws Exception
+ */
+ default List<String> supportProperties() throws Exception {
+ return Collections.emptyList();
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandlerServices.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandlerServices.java
new file mode 100644
index 0000000..ba8ff71
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisHandlerServices.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.hanlder;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Ameng.
+ * Unified class to search for a {@link RedisHandler} of provided type and
properties.
+ * for find correct redis handler.
+ * @param <T> redis handler type.
+ */
+public class RedisHandlerServices<T> {
+
+ private static final ServiceLoader<RedisHandler> defaultLoader =
ServiceLoader.load(RedisHandler.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(RedisHandlerServices.class);
+
+ /**
+ * use specified class and properties to find redis handler.
+ * @param RedisHanlderClass specified redis handler class.
+ * @param meta properties to search redis handler
+ * @param <T>
+ * @return
+ */
+ public static <T extends RedisHandler> T findRedisHandler(Class<T>
RedisHanlderClass, Map<String, String> meta) {
+ Preconditions.checkNotNull(meta);
+ return findSingRedisHandler(RedisHanlderClass, meta, Optional.empty());
+ }
+
+
+ /**
+ * use specified class and properties and class loader to find redis
handler.
+ * @param RedisHanlderClass specified redis handler class.
+ * @param meta properties to search redis handler
+ * @param classLoader class loader to load redis handler class
+ * @param <T> redis handler
+ * @return matched redis handler
+ */
+ private static <T extends RedisHandler> T findSingRedisHandler(
+ Class<T> RedisHanlderClass,
+ Map<String, String> meta,
+ Optional<ClassLoader> classLoader) {
+
+ List<RedisHandler> redisHandlers = discoverRedisHanlder(classLoader);
+ List<T> filtered = filter(redisHandlers, RedisHanlderClass, meta);
+
+ return filtered.get(0);
+ }
+
+
+ /**
+ * Filters found redis by factory class and with matching context.
+ */
+ private static <T extends RedisHandler> List<T> filter(
+ List<RedisHandler> redis,
+ Class<T> redisClass,
+ Map<String, String> meta) {
+
+ Preconditions.checkNotNull(redisClass);
+ Preconditions.checkNotNull(meta);
+
+ List<T> redisFactories = filterByFactoryClass(
+ redisClass,
+ redis);
+
+ List<T> contextFactories = filterByContext(
+ meta,
+ redisFactories);
+ return contextFactories;
+ }
+
+ /**
+ * Searches for redis using Java service providers.
+ *
+ * @return all redis in the classpath
+ */
+ private static List<RedisHandler>
discoverRedisHanlder(Optional<ClassLoader> classLoader) {
+ try {
+ List<RedisHandler> result = new LinkedList<>();
+ if (classLoader.isPresent()) {
+ ServiceLoader
+ .load(RedisHandler.class, classLoader.get())
+ .iterator()
+ .forEachRemaining(result::add);
+ } else {
+ defaultLoader.iterator().forEachRemaining(result::add);
+ }
+ return result;
+ } catch (ServiceConfigurationError e) {
+ LOG.error("Could not load service provider for redis handler.", e);
+ throw new TableException("Could not load service provider for
redis handler.", e);
+ }
+
+ }
+
+ /**
+ * Filters factories with matching context by factory class.
+ */
+ @SuppressWarnings("unchecked")
+ private static <T> List<T> filterByFactoryClass(
+ Class<T> redisClass,
+ List<RedisHandler> redis) {
+
+ List<RedisHandler> redisList = redis.stream()
+ .filter(p -> redisClass.isAssignableFrom(p.getClass()))
+ .collect(Collectors.toList());
+
+ if (redisList.isEmpty()) {
+ throw new RuntimeException(
+ String.format("No redis hanlder implements '%s'.",
redisClass.getCanonicalName()));
+ }
+
+ return (List<T>) redisList;
+ }
+
+ /**
+ * Filters for factories with matching context.
+ *
+ * @return all matching factories
+ */
+ private static <T extends RedisHandler> List<T> filterByContext(
+ Map<String, String> meta,
+ List<T> redisList) {
+
+ List<T> matchingredis = redisList.stream().filter(factory -> {
+ Map<String, String> requestedContext = normalizeContext(factory);
+
+ Map<String, String> plainContext = new HashMap<>(requestedContext);
+
+ // check if required context is met
+ return plainContext.keySet()
+ .stream()
+ .allMatch(e -> meta.containsKey(e) &&
meta.get(e).equals(plainContext.get(e)));
+ }).collect(Collectors.toList());
+
+ if (matchingredis.isEmpty()) {
+ throw new RuntimeException("no match redis");
+ }
+
+ return matchingredis;
+ }
+
+ /**
+ * Prepares the properties of a context to be used for match operations.
+ */
+ private static Map<String, String> normalizeContext(RedisHandler redis) {
+ Map<String, String> requiredContext = redis.requiredContext();
+ if (requiredContext == null) {
+ throw new RuntimeException(
+ String.format("Required context of redis '%s' must not be
null.", redis.getClass().getName()));
+ }
+ return requiredContext.keySet().stream()
+ .collect(Collectors.toMap(String::toLowerCase,
requiredContext::get));
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
new file mode 100644
index 0000000..a0b087c
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.hanlder;
+
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_KEY_TTL;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Ameng .
+ * handler for create redis mapper.
+ */
+public interface RedisMapperHandler extends RedisHandler {
+
+ Logger LOGGER = LoggerFactory.getLogger(RedisMapperHandler.class);
+
+ /**
+ * create a correct redis mapper use properties.
+ * @param properties to create redis mapper.
+ * @return redis mapper.
+ */
+ default RedisMapper createRedisMapper(Map<String, String> properties) {
+ String ttl = properties.get(REDIS_KEY_TTL);
+ try {
+ Class redisMapper =
Class.forName(this.getClass().getCanonicalName());
+
+ if (ttl == null) {
+ return (RedisMapper) redisMapper.newInstance();
+ }
+ Constructor c = redisMapper.getConstructor(Integer.class);
+ return (RedisMapper) c.newInstance(Integer.parseInt(ttl));
+ } catch (Exception e) {
+ LOGGER.error("create redis mapper failed", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index d465e83..282d15a 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -75,7 +75,28 @@ public enum RedisCommand {
* Sets field in the hash stored at key to value. If key does not exist,
* a new key holding a hash is created. If field already exists in the
hash, it is overwritten.
*/
- HSET(RedisDataType.HASH);
+ HSET(RedisDataType.HASH),
+
+ /**
+ * Delta plus for specified key.
+ */
+ INCRBY(RedisDataType.STRING),
+
+ /**
+ * Delta plus for specified key and expire the key with fixed time.
+ */
+ INCRBY_EX(RedisDataType.STRING),
+
+ /**
+ * decrease with fixed num for specified key.
+ */
+ DECRBY(RedisDataType.STRING),
+
+ /**
+ * decrease with fixed num for specified key and expire the key with fixed
time.
+ */
+ DESCRBY_EX(RedisDataType.STRING);
+
/**
* The {@link RedisDataType} this command belongs to.
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
index 3284361..ba1d8b7 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -83,6 +83,18 @@ public class RedisCommandDescription implements Serializable
{
throw new IllegalArgumentException("SETEX command should have
time to live (TTL)");
}
}
+
+ if (redisCommand.equals(RedisCommand.INCRBY_EX)) {
+ if (additionalTTL == null) {
+ throw new IllegalArgumentException("INCRBY_EX command should
have time to live (TTL)");
+ }
+ }
+
+ if (redisCommand.equals(RedisCommand.DESCRBY_EX)) {
+ if (additionalTTL == null) {
+ throw new IllegalArgumentException("INCRBY_EX command should
have time to live (TTL)");
+ }
+ }
}
/**
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByExMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByExMapper.java
new file mode 100644
index 0000000..0844079
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByExMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * decrease with expire operation redis mapper.
+ */
+public class DecrByExMapper extends RowRedisMapper {
+
+ public DecrByExMapper() {
+ super(RedisCommand.DESCRBY_EX);
+ }
+
+ public DecrByExMapper(Integer ttl) {
+ super(ttl, RedisCommand.DESCRBY_EX);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByMapper.java
new file mode 100644
index 0000000..071eed4
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/DecrByMapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * decrease operation redis mapper.
+ */
+public class DecrByMapper extends RowRedisMapper {
+
+ public DecrByMapper() {
+ super(RedisCommand.DECRBY);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HSetMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HSetMapper.java
new file mode 100644
index 0000000..1cea53f
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/HSetMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * HSET operation redis mapper.
+ */
+public class HSetMapper extends RowRedisMapper {
+
+ public HSetMapper() {
+ super(RedisCommand.HSET);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByExMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByExMapper.java
new file mode 100644
index 0000000..0621334
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByExMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * Delta plus with expire key operation redis mapper.
+ */
+public class IncrByExMapper extends RowRedisMapper {
+
+ public IncrByExMapper() {
+ super(RedisCommand.INCRBY_EX);
+ }
+
+ public IncrByExMapper(Integer ttl) {
+ super(ttl, RedisCommand.INCRBY_EX);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByMapper.java
new file mode 100644
index 0000000..b0bd567
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/IncrByMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * Delta plus operation
+ */
+public class IncrByMapper extends RowRedisMapper {
+
+ public IncrByMapper() {
+ super(RedisCommand.INCRBY);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/LPushMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/LPushMapper.java
new file mode 100644
index 0000000..0f5e39c
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/LPushMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * LPUSH operation redis mapper.
+ */
+public class LPushMapper extends RowRedisMapper {
+
+ public LPushMapper() {
+ super(RedisCommand.LPUSH);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/PfAddMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/PfAddMapper.java
new file mode 100644
index 0000000..7596520
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/PfAddMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * PFADD operation redis mapper.
+ */
+public class PfAddMapper extends RowRedisMapper {
+
+ public PfAddMapper() {
+ super(RedisCommand.PFADD);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RPushMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RPushMapper.java
new file mode 100644
index 0000000..b0d1a6d
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RPushMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * RPUSH operation redis mapper.
+ */
+public class RPushMapper extends RowRedisMapper {
+
+ public RPushMapper() {
+ super(RedisCommand.RPUSH);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
new file mode 100644
index 0000000..fe294b8
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_COMMAND;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.api.java.tuple.Tuple2;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Ameng .
+ * base row redis mapper implement.
+ */
+public abstract class RowRedisMapper implements RedisMapper<Tuple2<Boolean,
Row>>, RedisMapperHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RowRedisMapper.class);
+
+ private Integer ttl;
+
+ private RedisCommand redisCommand;
+
+ public int getTtl() {
+ return ttl;
+ }
+
+ public void setTtl(int ttl) {
+ this.ttl = ttl;
+ }
+
+ public RedisCommand getRedisCommand() {
+ return redisCommand;
+ }
+
+ public void setRedisCommand(RedisCommand redisCommand) {
+ this.redisCommand = redisCommand;
+ }
+
+ public RowRedisMapper() {
+ }
+
+ public RowRedisMapper(int ttl, RedisCommand redisCommand) {
+ this.ttl = ttl;
+ this.redisCommand = redisCommand;
+ }
+
+ public RowRedisMapper(RedisCommand redisCommand) {
+ this.redisCommand = redisCommand;
+ }
+
+ @Override
+ public RedisCommandDescription getCommandDescription() {
+ if (ttl != null) {
+ return new RedisCommandDescription(redisCommand, ttl);
+ }
+ return new RedisCommandDescription(redisCommand);
+ }
+
+ @Override
+ public String getKeyFromData(Tuple2<Boolean, Row> data) {
+ return data.f1.getField(0).toString();
+ }
+
+ @Override
+ public String getValueFromData(Tuple2<Boolean, Row> data) {
+ return data.f1.getField(1).toString();
+ }
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> require = new HashMap<>();
+ require.put(REDIS_COMMAND, getRedisCommand().name());
+ return require;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ RedisCommand redisCommand = ((RowRedisMapper) obj).redisCommand;
+ return this.redisCommand == redisCommand;
+ }
+
+ @Override
+ public Optional<Integer> getAdditionalTTL(Tuple2<Boolean, Row> data) {
+ return Optional.ofNullable(getTtl());
+ }
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SAddMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SAddMapper.java
new file mode 100644
index 0000000..cee1347
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SAddMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * SADD operation redis mapper.
+ */
+public class SAddMapper extends RowRedisMapper {
+
+ public SAddMapper() {
+ super(RedisCommand.SADD);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetExMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetExMapper.java
new file mode 100644
index 0000000..a0f8bbb
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetExMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng.
+ * SET with expire key operation redis mapper.
+ */
+public class SetExMapper extends RowRedisMapper {
+
+ public SetExMapper() {
+ super(RedisCommand.SETEX);
+ }
+
+ public SetExMapper(Integer ttl) {
+ super(ttl, RedisCommand.SETEX);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetMapper.java
new file mode 100644
index 0000000..9e768c1
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/SetMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * SET operation redis mapper.
+ */
+public class SetMapper extends RowRedisMapper {
+
+ public SetMapper() {
+ super(RedisCommand.SET);
+ }
+
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java
new file mode 100644
index 0000000..3e5b7ed
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.mapper.row;
+
+
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+
+/**
+ * @author Ameng .
+ * ZADD operation redis mapper.
+ */
+public class ZAddMapper extends RowRedisMapper {
+
+ public ZAddMapper() {
+ super(RedisCommand.ZADD);
+ }
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
new file mode 100644
index 0000000..af21163
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/Redis.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.descriptor;
+
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_CLUSTER;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_COMMAND;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_KEY_TTL;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MASTER_NAME;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_NODES;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_SENTINEL;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * @author Ameng .
+ * redis descriptor for create redis connector.
+ */
+public class Redis extends ConnectorDescriptor {
+
+ Map<String, String> properties = new HashMap<>();
+
+ private String mode = null;
+ private String redisCommand = null;
+ private Integer ttl;
+
+ public Redis(String type, int version, boolean formatNeeded) {
+ super(REDIS, version, formatNeeded);
+ }
+
+ public Redis() {
+ this(REDIS, 1, false);
+ }
+
+ /**
+ * redis operation type.
+ * @param redisCommand redis operation type
+ * @return this descriptor.
+ */
+ public Redis command(String redisCommand) {
+ this.redisCommand = redisCommand;
+ properties.put(REDIS_COMMAND, redisCommand);
+ return this;
+ }
+
+ /**
+ * ttl for specified key.
+ * @param ttl time for key.
+ * @returnthis descriptor
+ */
+ public Redis ttl(Integer ttl) {
+ this.ttl = ttl;
+ properties.put(REDIS_KEY_TTL, String.valueOf(ttl));
+ return this;
+ }
+
+ /**
+ * redis mode to connect a specified redis cluster
+ * @param mode redis mode
+ * @return this descriptor
+ */
+ public Redis mode(String mode) {
+ this.mode = mode;
+ properties.put(REDIS_MODE, mode);
+ return this;
+ }
+
+ /**
+ * add properties used to connect to redis.
+ * @param k specified key
+ * @param v value for specified key
+ * @return this descriptor
+ */
+ public Redis property(String k, String v) {
+ properties.put(k, v);
+ return this;
+ }
+
+ @Override
+ protected Map<String, String> toConnectorProperties() {
+ validate();
+ return properties;
+ }
+
+ /**
+ * validate the necessary properties for redis descriptor.
+ */
+ public void validate() {
+ Preconditions.checkArgument(properties.containsKey(REDIS_COMMAND),
"need specified redis command");
+ if (mode.equalsIgnoreCase(REDIS_CLUSTER)) {
+ Preconditions.checkArgument(properties.containsKey(REDIS_NODES),
"cluster mode need cluster-nodes info");
+ } else if (mode.equalsIgnoreCase(REDIS_SENTINEL)) {
+
Preconditions.checkArgument(properties.containsKey(REDIS_MASTER_NAME),
"sentinel mode need master name");
+
Preconditions.checkArgument(properties.containsKey(REDIS_SENTINEL), "sentinel
mode need sentinel infos");
+ }
+ }
+}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisVadidator.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisVadidator.java
new file mode 100644
index 0000000..f9b604d
--- /dev/null
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisVadidator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.descriptor;
+
+/**
+ * @author Ameng .
+ * redis validator for validate redis descriptor.
+ */
+public class RedisVadidator {
+ public static final String REDIS = "redis";
+ public static final String REDIS_MODE = "redis-mode";
+ public static final String REDIS_NODES = "cluster-nodes";
+ public static final String REDIS_CLUSTER = "cluster";
+ public static final String REDIS_SENTINEL = "sentinel";
+ public static final String REDIS_COMMAND = "command";
+ public static final String REDIS_MASTER_NAME = "master.name";
+ public static final String SENTINELS_INFO = "sentinels.info";
+ public static final String SENTINELS_PASSWORD = "sentinels.password";
+ public static final String REDIS_KEY_TTL = "key.ttl";
+
+}
diff --git
a/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
new file mode 100644
index 0000000..7964e86
--- /dev/null
+++
b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.redis.common.mapper.row.SetExMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.DecrByMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.DecrByExMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.HSetMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.IncrByMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.IncrByExMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.LPushMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.PfAddMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.RPushMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.SAddMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.SetMapper
+org.apache.flink.streaming.connectors.redis.common.mapper.row.ZAddMapper
+org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisClusterConfigHandler
+org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisSentinelConfigHandler
\ No newline at end of file
diff --git
a/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..fb97767
--- /dev/null
+++
b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.redis.RedisTableSinkFactory
\ No newline at end of file
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
new file mode 100644
index 0000000..2caf8f3
--- /dev/null
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisDescriptorTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.descriptor.Redis;
+import org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.types.Row;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RedisDescriptorTest extends RedisITCaseBase{
+
+ private static final String REDIS_KEY = "TEST_KEY";
+
+ StreamExecutionEnvironment env;
+
+ @Before
+ public void setUp(){
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ @Test
+ public void testRedisDescriptor() throws Exception {
+ DataStreamSource<Row> source = (DataStreamSource<Row>)
env.addSource(new TestSourceFunctionString())
+ .returns(new RowTypeInfo(TypeInformation.of(String.class),
TypeInformation.of(Long.class)));
+
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useOldPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
+ tableEnvironment.registerDataStream("t1", source, "k, v");
+
+ Redis redis = new Redis()
+ .mode(RedisVadidator.REDIS_CLUSTER)
+ .command(RedisCommand.INCRBY_EX.name())
+ .ttl(100000)
+ .property(RedisVadidator.REDIS_NODES, REDIS_HOST+ ":" +
REDIS_PORT);
+
+ tableEnvironment
+ .connect(redis).withSchema(new Schema()
+ .field("k", TypeInformation.of(String.class))
+ .field("v", TypeInformation.of(Long.class)))
+ .registerTableSink("redis");
+
+
+ tableEnvironment.sqlUpdate("insert into redis select k, v from t1");
+ env.execute("Test Redis Table");
+ }
+
+
+ private static class TestSourceFunctionString implements
SourceFunction<Row> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Row> ctx) throws Exception {
+ while (running) {
+ Row row = new Row(2);
+ row.setField(0, REDIS_KEY);
+ row.setField(1, 2L);
+ ctx.collect(row);
+ Thread.sleep(2000L);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+}
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
new file mode 100644
index 0000000..54a7fd9
--- /dev/null
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common;
+
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_CLUSTER;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_COMMAND;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_KEY_TTL;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_MODE;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisVadidator.REDIS_NODES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandlerServices;
+import
org.apache.flink.streaming.connectors.redis.common.hanlder.RedisMapperHandler;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import
org.apache.flink.streaming.connectors.redis.common.mapper.row.SetExMapper;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RedisHandlerTest extends AbstractTestBase {
+ public static final Map<String, String> properties = new HashMap<>();
+
+ @BeforeClass
+ public static void setUp() {
+ properties.put(REDIS_MODE, REDIS_CLUSTER);
+ properties.put(REDIS_COMMAND, RedisCommand.SETEX.name());
+ properties.put(REDIS_NODES, "localhost:8080");
+ properties.put(REDIS_KEY_TTL, "1000");
+ }
+
+ @Test
+ public void testRedisMapper() {
+ RedisMapper redisMapper =
RedisHandlerServices.findRedisHandler(RedisMapperHandler.class, properties)
+ .createRedisMapper(properties);
+ SetExMapper expectedMapper = new SetExMapper(1000);
+ assertEquals(redisMapper, expectedMapper);
+ }
+
+ @Test
+ public void testFlinkJedisConfigHandler() {
+ FlinkJedisConfigBase flinkJedisConfigBase = RedisHandlerServices
+ .findRedisHandler(FlinkJedisConfigHandler.class, properties)
+ .createFlinkJedisConfig(properties);
+ assertTrue(flinkJedisConfigBase instanceof FlinkJedisClusterConfig);
+ }
+}