[BAHIR-55] Add Redis connector from Flink Closes #1
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/b2955a74 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/b2955a74 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/b2955a74 Branch: refs/heads/master Commit: b2955a749e39cab55612917e4d5e702781f1e87c Parents: 9966a0c Author: Robert Metzger <[email protected]> Authored: Fri Aug 19 10:55:30 2016 +0200 Committer: Luciano Resende <[email protected]> Committed: Fri Aug 19 19:09:27 2016 -0700 ---------------------------------------------------------------------- dev/checkstyle.xml | 11 +- flink-connector-redis/pom.xml | 78 ++++++ .../streaming/connectors/redis/RedisSink.java | 188 ++++++++++++++ .../streaming/connectors/redis/common/Util.java | 25 ++ .../common/config/FlinkJedisClusterConfig.java | 188 ++++++++++++++ .../common/config/FlinkJedisConfigBase.java | 91 +++++++ .../common/config/FlinkJedisPoolConfig.java | 225 ++++++++++++++++ .../common/config/FlinkJedisSentinelConfig.java | 260 +++++++++++++++++++ .../common/container/RedisClusterContainer.java | 171 ++++++++++++ .../container/RedisCommandsContainer.java | 115 ++++++++ .../RedisCommandsContainerBuilder.java | 117 +++++++++ .../redis/common/container/RedisContainer.java | 252 ++++++++++++++++++ .../redis/common/mapper/RedisCommand.java | 86 ++++++ .../common/mapper/RedisCommandDescription.java | 93 +++++++ .../redis/common/mapper/RedisDataType.java | 66 +++++ .../redis/common/mapper/RedisMapper.java | 66 +++++ .../connectors/redis/RedisITCaseBase.java | 45 ++++ .../redis/RedisSentinelClusterTest.java | 99 +++++++ .../connectors/redis/RedisSinkITCase.java | 233 +++++++++++++++++ .../redis/RedisSinkPublishITCase.java | 137 ++++++++++ .../connectors/redis/RedisSinkTest.java | 143 ++++++++++ .../common/config/FlinkJedisConfigBaseTest.java | 50 ++++ .../common/config/JedisClusterConfigTest.java | 49 ++++ .../common/config/JedisPoolConfigTest.java | 29 +++ .../common/config/JedisSentinelConfigTest.java | 49 ++++ .../mapper/RedisDataTypeDescriptionTest.java | 41 +++ .../src/test/resources/log4j.properties | 27 ++ pom.xml | 11 - 28 files changed, 2928 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/dev/checkstyle.xml ---------------------------------------------------------------------- diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml index 3de6aa9..7a0558c 100644 --- a/dev/checkstyle.xml +++ b/dev/checkstyle.xml @@ -41,7 +41,7 @@ --> -<module name = "Checker"> +<module name="Checker"> <property name="charset" value="UTF-8"/> <property name="severity" value="error"/> @@ -78,10 +78,6 @@ <property name="allowByTailComment" value="true"/> <property name="allowNonPrintableEscapes" value="true"/> </module> - <module name="LineLength"> - <property name="max" value="100"/> - <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/> - </module> <module name="NoLineWrap"/> <module name="EmptyBlock"> <property name="option" value="TEXT"/> @@ -165,7 +161,10 @@ <property name="exceptionVariableName" value="expected"/> </module> <module name="CommentsIndentation"/> - <module name="UnusedImports"/> + <module name="UnusedImports"> + <!-- Allow imports for JavaDocs --> + <property name="processJavadoc" value="true"/> + </module> <module name="RedundantImport"/> <module name="RedundantModifier"/> </module> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml new file mode 100644 index 0000000..c34711e --- /dev/null +++ b/flink-connector-redis/pom.xml @@ -0,0 +1,78 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink_parent_2.11</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-redis_2.11</artifactId> + <name>flink-connector-redis</name> + + <packaging>jar</packaging> + + <properties> + <jedis.version>2.8.0</jedis.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + <version>${jedis.version}</version> + </dependency> + + <dependency> + <groupId>com.github.kstyrc</groupId> + <artifactId>embedded-redis</artifactId> + <version>0.6</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.11</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java new file mode 100644 index 0000000..688f94a --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}. + * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument, + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when + * you want to connect to a single Redis server. + * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel. + * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to + * a Redis Cluster. + * + * <p>Example: + * + * <pre> + *{@code + *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> { + * + * private RedisCommand redisCommand; + * + * public RedisExampleMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisCommandDescription getCommandDescription() { + * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2<String, String> data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2<String, String> data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH)); + *}</pre> + * + * @param <IN> Type of the elements emitted by this sink + */ +public class RedisSink<IN> extends RichSinkFunction<IN> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** + * This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. + * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. + * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. + * {@code additionalKey} used as hash name for {@link RedisDataType#HASH} + * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score. + * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET} + */ + private String additionalKey; + private RedisMapper<IN> redisSinkMapper; + private RedisCommand redisCommand; + + private FlinkJedisConfigBase flinkJedisConfigBase; + private RedisCommandsContainer redisCommandsContainer; + + /** + * Creates a new {@link RedisSink} that connects to the Redis server. + * + * @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase} + * @param redisSinkMapper This is used to generate Redis command and key value from incoming elements. + */ + public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) { + Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null"); + Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null"); + Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null"); + + this.flinkJedisConfigBase = flinkJedisConfigBase; + + this.redisSinkMapper = redisSinkMapper; + RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); + this.redisCommand = redisCommandDescription.getCommand(); + this.additionalKey = redisCommandDescription.getAdditionalKey(); + } + + /** + * Called when new data arrives to the sink, and forwards it to Redis channel. + * Depending on the specified Redis data type (see {@link RedisDataType}), + * a different Redis command will be applied. + * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD. + * + * @param input The incoming data + */ + @Override + public void invoke(IN input) throws Exception { + String key = redisSinkMapper.getKeyFromData(input); + String value = redisSinkMapper.getValueFromData(input); + + switch (redisCommand) { + case RPUSH: + this.redisCommandsContainer.rpush(key, value); + break; + case LPUSH: + this.redisCommandsContainer.lpush(key, value); + break; + case SADD: + this.redisCommandsContainer.sadd(key, value); + break; + case SET: + this.redisCommandsContainer.set(key, value); + break; + case PFADD: + this.redisCommandsContainer.pfadd(key, value); + break; + case PUBLISH: + this.redisCommandsContainer.publish(key, value); + break; + case ZADD: + this.redisCommandsContainer.zadd(this.additionalKey, value, key); + break; + case HSET: + this.redisCommandsContainer.hset(this.additionalKey, key, value); + break; + default: + throw new IllegalArgumentException("Cannot process such data type: " + redisCommand); + } + } + + /** + * Initializes the connection to Redis by either cluster or sentinels or single server. + * + * @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null + */ + @Override + public void open(Configuration parameters) throws Exception { + try { + this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); + this.redisCommandsContainer.open(); + } catch (Exception e) { + LOG.error("Redis has not been properly initialized: ", e); + throw e; + } + } + + /** + * Closes commands container. + * @throws IOException if command container is unable to close. + */ + @Override + public void close() throws IOException { + if (redisCommandsContainer != null) { + redisCommandsContainer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java new file mode 100644 index 0000000..b0e38b9 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common; + +public class Util { + public static void checkArgument(boolean condition, String message) { + if(!condition) { + throw new IllegalArgumentException(message); + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java new file mode 100644 index 0000000..119ade3 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.Util; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** + * Configuration for Jedis cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private final Set<InetSocketAddress> nodes; + private final int maxRedirections; + + + /** + * Jedis cluster configuration. + * The list of node is mandatory, and when nodes is not set, it throws NullPointerException. + * + * @param nodes list of node information for JedisCluster + * @param connectionTimeout socket / connection timeout. The default is 2000 + * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK + * @param maxTotal the maximum number of objects that can be allocated by the pool + * @param maxIdle the cap on the number of "idle" instances in the pool + * @param minIdle the minimum number of idle objects to maintain in the pool + * @throws NullPointerException if parameter {@code nodes} is {@code null} + */ + private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + + Objects.requireNonNull(nodes, "Node information should be presented"); + Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); + this.nodes = new HashSet<>(nodes); + this.maxRedirections = maxRedirections; + } + + + + /** + * Returns nodes. + * + * @return list of node information + */ + public Set<HostAndPort> getNodes() { + Set<HostAndPort> ret = new HashSet<>(); + for (InetSocketAddress node : nodes) { + ret.add(new HostAndPort(node.getHostName(), node.getPort())); + } + return ret; + } + + /** + * Returns limit of redirection. + * + * @return limit of redirection + */ + public int getMaxRedirections() { + return maxRedirections; + } + + + /** + * Builder for initializing {@link FlinkJedisClusterConfig}. + */ + public static class Builder { + private Set<InetSocketAddress> nodes; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int maxRedirections = 5; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets list of node. + * + * @param nodes list of node + * @return Builder itself + */ + public Builder setNodes(Set<InetSocketAddress> nodes) { + this.nodes = nodes; + return this; + } + + /** + * Sets socket / connection timeout. + * + * @param timeout socket / connection timeout, default value is 2000 + * @return Builder itself + */ + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * Sets limit of redirection. + * + * @param maxRedirections limit of redirection, default value is 5 + * @return Builder itself + */ + public Builder setMaxRedirections(int maxRedirections) { + this.maxRedirections = maxRedirections; + return this; + } + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Builds JedisClusterConfig. + * + * @return JedisClusterConfig + */ + public FlinkJedisClusterConfig build() { + return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisClusterConfig{" + + "nodes=" + nodes + + ", timeout=" + connectionTimeout + + ", maxRedirections=" + maxRedirections + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java new file mode 100644 index 0000000..0d821ed --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.Util; + +import java.io.Serializable; + +/** + * Base class for Flink Redis configuration. + */ +public abstract class FlinkJedisConfigBase implements Serializable { + private static final long serialVersionUID = 1L; + + protected final int maxTotal; + protected final int maxIdle; + protected final int minIdle; + protected final int connectionTimeout; + + protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){ + Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative"); + Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative"); + Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative"); + Util.checkArgument(minIdle >= 0, "minIdle value can not be negative"); + + this.connectionTimeout = connectionTimeout; + this.maxTotal = maxTotal; + this.maxIdle = maxIdle; + this.minIdle = minIdle; + } + + /** + * Returns timeout. + * + * @return connection timeout + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Get the value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code maxTotal} for this + * configuration instance + * @see GenericObjectPoolConfig#getMaxTotal() + */ + public int getMaxTotal() { + return maxTotal; + } + + /** + * Get the value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code maxIdle} for this + * configuration instance + * @see GenericObjectPoolConfig#getMaxIdle() + */ + public int getMaxIdle() { + return maxIdle; + } + + /** + * Get the value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code minIdle} for this + * configuration instance + * @see GenericObjectPoolConfig#getMinIdle() + */ + public int getMinIdle() { + return minIdle; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java new file mode 100644 index 0000000..d4c30ff --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.Protocol; + +import java.util.Objects; + +/** + * Configuration for Jedis pool. + */ +public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { + + private static final long serialVersionUID = 1L; + + private final String host; + private final int port; + private final int database; + private final String password; + + + /** + * Jedis pool configuration. + * The host is mandatory, and when host is not set, it throws NullPointerException. + * + * @param host hostname or IP + * @param port port, default value is 6379 + * @param connectionTimeout socket / connection timeout, default value is 2000 milli second + * @param password password, if any + * @param database database index + * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @throws NullPointerException if parameter {@code host} is {@code null} + */ + private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + Objects.requireNonNull(host, "Host information should be presented"); + this.host = host; + this.port = port; + this.database = database; + this.password = password; + } + + /** + * Returns host. + * + * @return hostname or IP + */ + public String getHost() { + return host; + } + + /** + * Returns port. + * + * @return port + */ + public int getPort() { + return port; + } + + + /** + * Returns database index. + * + * @return database index + */ + public int getDatabase() { + return database; + } + + /** + * Returns password. + * + * @return password + */ + public String getPassword() { + return password; + } + + /** + * Builder for initializing {@link FlinkJedisPoolConfig}. + */ + public static class Builder { + private String host; + private int port = Protocol.DEFAULT_PORT; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int database = Protocol.DEFAULT_DATABASE; + private String password; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Sets host. + * + * @param host host + * @return Builder itself + */ + public Builder setHost(String host) { + this.host = host; + return this; + } + + /** + * Sets port. + * + * @param port port, default value is 6379 + * @return Builder itself + */ + public Builder setPort(int port) { + this.port = port; + return this; + } + + /** + * Sets timeout. + * + * @param timeout timeout, default value is 2000 + * @return Builder itself + */ + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * Sets database index. + * + * @param database database index, default value is 0 + * @return Builder itself + */ + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + /** + * Sets password. + * + * @param password password, if any + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + + /** + * Builds JedisPoolConfig. + * + * @return JedisPoolConfig + */ + public FlinkJedisPoolConfig build() { + return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisPoolConfig{" + + "host='" + host + '\'' + + ", port=" + port + + ", timeout=" + connectionTimeout + + ", database=" + database + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java new file mode 100644 index 0000000..6058a53 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** + * Configuration for Jedis Sentinel pool. + */ +public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class); + + private final String masterName; + private final Set<String> sentinels; + private final int soTimeout; + private final String password; + private final int database; + + /** + * Jedis Sentinels config. + * The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException. + * + * @param masterName master name of the replica set + * @param sentinels set of sentinel hosts + * @param connectionTimeout timeout connection timeout + * @param soTimeout timeout socket timeout + * @param password password, if any + * @param database database database index + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool + * @param maxIdle the cap on the number of "idle" instances in the pool + * @param minIdle the minimum number of idle objects to maintain in the pool + * + * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null} + * @throws IllegalArgumentException if {@code sentinels} are empty + */ + private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels, + int connectionTimeout, int soTimeout, + String password, int database, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + Objects.requireNonNull(masterName, "Master name should be presented"); + Objects.requireNonNull(sentinels, "Sentinels information should be presented"); + Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty"); + + this.masterName = masterName; + this.sentinels = new HashSet<>(sentinels); + this.soTimeout = soTimeout; + this.password = password; + this.database = database; + } + + /** + * Returns master name of the replica set. + * + * @return master name of the replica set. + */ + public String getMasterName() { + return masterName; + } + + /** + * Returns Sentinels host addresses. + * + * @return Set of Sentinels host addresses + */ + public Set<String> getSentinels() { + return sentinels; + } + + /** + * Returns socket timeout. + * + * @return socket timeout + */ + public int getSoTimeout() { + return soTimeout; + } + + /** + * Returns password. + * + * @return password + */ + public String getPassword() { + return password; + } + + /** + * Returns database index. + * + * @return database index + */ + public int getDatabase() { + return database; + } + + /** + * Builder for initializing {@link FlinkJedisSentinelConfig}. + */ + public static class Builder { + private String masterName; + private Set<String> sentinels; + private int connectionTimeout = Protocol.DEFAULT_TIMEOUT; + private int soTimeout = Protocol.DEFAULT_TIMEOUT; + private String password; + private int database = Protocol.DEFAULT_DATABASE; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets master name of the replica set. + * + * @param masterName master name of the replica set + * @return Builder itself + */ + public Builder setMasterName(String masterName) { + this.masterName = masterName; + return this; + } + + /** + * Sets sentinels address. + * + * @param sentinels host set of the sentinels + * @return Builder itself + */ + public Builder setSentinels(Set<String> sentinels) { + this.sentinels = sentinels; + return this; + } + + /** + * Sets connection timeout. + * + * @param connectionTimeout connection timeout, default value is 2000 + * @return Builder itself + */ + public Builder setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Sets socket timeout. + * + * @param soTimeout socket timeout, default value is 2000 + * @return Builder itself + */ + public Builder setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + return this; + } + + /** + * Sets password. + * + * @param password password, if any + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Sets database index. + * + * @param database database index, default value is 0 + * @return Builder itself + */ + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Builds JedisSentinelConfig. + * + * @return JedisSentinelConfig + */ + public FlinkJedisSentinelConfig build(){ + return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout, + password, database, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisSentinelConfig{" + + "masterName='" + masterName + '\'' + + ", connectionTimeout=" + connectionTimeout + + ", soTimeout=" + soTimeout + + ", database=" + database + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java new file mode 100644 index 0000000..cc1d626 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); + + private transient JedisCluster jedisCluster; + + /** + * Initialize Redis command container for Redis cluster. + * + * @param jedisCluster JedisCluster instance + */ + public RedisClusterContainer(JedisCluster jedisCluster) { + Objects.requireNonNull(jedisCluster, "Jedis cluster can not be null"); + + this.jedisCluster = jedisCluster; + } + + @Override + public void open() throws Exception { + + // echo() tries to open a connection and echos back the + // message passed as argument. Here we use it to monitor + // if we can communicate with the cluster. + + jedisCluster.echo("Test"); + } + + @Override + public void hset(final String key, final String hashField, final String value) { + try { + jedisCluster.hset(key, hashField, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to hash {} error message {}", + key, hashField, e.getMessage()); + } + throw e; + } + } + + @Override + public void rpush(final String listName, final String value) { + try { + jedisCluster.rpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}", + listName, e.getMessage()); + } + throw e; + } + } + + @Override + public void lpush(String listName, String value) { + try { + jedisCluster.lpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command LPUSH to list {} error message: {}", + listName, e.getMessage()); + } + throw e; + } + } + + @Override + public void sadd(final String setName, final String value) { + try { + jedisCluster.sadd(setName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}", + setName, e.getMessage()); + } + throw e; + } + } + + @Override + public void publish(final String channelName, final String message) { + try { + jedisCluster.publish(channelName, message); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}", + channelName, e.getMessage()); + } + throw e; + } + } + + @Override + public void set(final String key, final String value) { + try { + jedisCluster.set(key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command SET to key {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + @Override + public void pfadd(final String key, final String element) { + try { + jedisCluster.set(key, element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PFADD to key {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + @Override + public void zadd(final String key, final String score, final String element) { + try { + jedisCluster.zadd(key, Double.valueOf(score), element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command ZADD to set {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + /** + * Closes the {@link JedisCluster}. + */ + @Override + public void close() throws IOException { + this.jedisCluster.close(); + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java new file mode 100644 index 0000000..78771f1 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The container for all available Redis commands. + */ +public interface RedisCommandsContainer extends Serializable { + + /** + * Open the Jedis container. + * + * @throws Exception if the instance can not be opened properly + */ + void open() throws Exception; + + /** + * Sets field in the hash stored at key to value. + * If key does not exist, a new key holding a hash is created. + * If field already exists in the hash, it is overwritten. + * + * @param key Hash name + * @param hashField Hash field + * @param value Hash value + */ + void hset(String key, String hashField, String value); + + /** + * Insert the specified value at the tail of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + * + * @param listName Name of the List + * @param value Value to be added + */ + void rpush(String listName, String value); + + /** + * Insert the specified value at the head of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + * + * @param listName Name of the List + * @param value Value to be added + */ + void lpush(String listName, String value); + + /** + * Add the specified member to the set stored at key. + * Specified members that are already a member of this set are ignored. + * If key does not exist, a new set is created before adding the specified members. + * + * @param setName Name of the Set + * @param value Value to be added + */ + void sadd(String setName, String value); + + /** + * Posts a message to the given channel. + * + * @param channelName Name of the channel to which data will be published + * @param message the message + */ + void publish(String channelName, String message); + + /** + * Set key to hold the string value. If key already holds a value, it is overwritten, + * regardless of its type. Any previous time to live associated with the key is + * discarded on successful SET operation. + * + * @param key the key name in which value to be set + * @param value the value + */ + void set(String key, String value); + + /** + * Adds all the element arguments to the HyperLogLog data structure + * stored at the variable name specified as first argument. + * + * @param key The name of the key + * @param element the element + */ + void pfadd(String key, String element); + + /** + * Adds the specified member with the specified scores to the sorted set stored at key. + * + * @param key The name of the Sorted Set + * @param score Score of the element + * @param element element to be added + */ + void zadd(String key, String score, String element); + + /** + * Close the Jedis container. + * + * @throws IOException if the instance can not be closed properly + */ + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java new file mode 100644 index 0000000..0db5b05 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.util.Objects; + +/** + * The builder for {@link RedisCommandsContainer}. + */ +public class RedisCommandsContainerBuilder { + + /** + * Initialize the {@link RedisCommandsContainer} based on the instance type. + * @param flinkJedisConfigBase configuration base + * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null + */ + public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){ + if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){ + FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig); + } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) { + FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig); + } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) { + FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig); + } else { + throw new IllegalArgumentException("Jedis configuration not found"); + } + } + + /** + * Builds container for single Redis environment. + * + * @param jedisPoolConfig configuration for JedisPool + * @return container for single Redis environment + * @throws NullPointerException if jedisPoolConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { + Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); + + JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + return new RedisContainer(jedisPool); + } + + /** + * Builds container for Redis Cluster environment. + * + * @param jedisClusterConfig configuration for JedisCluster + * @return container for Redis Cluster environment + * @throws NullPointerException if jedisClusterConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) { + Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle()); + + JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(), + jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig); + return new RedisClusterContainer(jedisCluster); + } + + /** + * Builds container for Redis Sentinel environment. + * + * @param jedisSentinelConfig configuration for JedisSentinel + * @return container for Redis sentinel environment + * @throws NullPointerException if jedisSentinelConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { + Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle()); + + JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), + jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, + jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(), + jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase()); + return new RedisContainer(jedisSentinelPool); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java new file mode 100644 index 0000000..fb73a27 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private transient JedisPool jedisPool; + private transient JedisSentinelPool jedisSentinelPool; + + /** + * Use this constructor if to connect with single Redis server. + * + * @param jedisPool JedisPool which actually manages Jedis instances + */ + public RedisContainer(JedisPool jedisPool) { + Objects.requireNonNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + this.jedisSentinelPool = null; + } + + /** + * Use this constructor if Redis environment is clustered with sentinels. + * + * @param sentinelPool SentinelPool which actually manages Jedis instances + */ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Objects.requireNonNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisPool = null; + this.jedisSentinelPool = sentinelPool; + } + + /** + * Closes the Jedis instances. + */ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void open() throws Exception { + + // echo() tries to open a connection and echos back the + // message passed as argument. Here we use it to monitor + // if we can communicate with the cluster. + + getInstance().echo("Test"); + } + + @Override + public void hset(final String key, final String hashField, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(key, hashField, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}", + key, hashField, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void rpush(final String listName, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.rpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}", + listName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void lpush(String listName, String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.lpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command LUSH to list {} error message {}", + listName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void sadd(final String setName, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.sadd(setName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}", + setName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void publish(final String channelName, final String message) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.publish(channelName, message); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}", + channelName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void set(final String key, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.set(key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command SET to key {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void pfadd(final String key, final String element) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.pfadd(key, element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PFADD to key {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void zadd(final String key, final String score, final String element) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.zadd(key, Double.valueOf(score), element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command ZADD to set {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + /** + * Returns Jedis instance from the pool. + * + * @return the Jedis instance + */ + private Jedis getInstance() { + if (jedisSentinelPool != null) { + return jedisSentinelPool.getResource(); + } else { + return jedisPool.getResource(); + } + } + + /** + * Closes the jedis instance after finishing the command. + * + * @param jedis The jedis instance + */ + private void releaseInstance(final Jedis jedis) { + if (jedis == null) { + return; + } + try { + jedis.close(); + } catch (Exception e) { + LOG.error("Failed to close (return) instance to pool", e); + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java new file mode 100644 index 0000000..cf9842c --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available commands for Redis. Each command belongs to a {@link RedisDataType} group. + */ +public enum RedisCommand { + + /** + * Insert the specified value at the head of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operations. + */ + LPUSH(RedisDataType.LIST), + + /** + * Insert the specified value at the tail of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + */ + RPUSH(RedisDataType.LIST), + + /** + * Add the specified member to the set stored at key. + * Specified member that is already a member of this set is ignored. + */ + SADD(RedisDataType.SET), + + /** + * Set key to hold the string value. If key already holds a value, + * it is overwritten, regardless of its type. + */ + SET(RedisDataType.STRING), + + /** + * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument. + */ + PFADD(RedisDataType.HYPER_LOG_LOG), + + /** + * Posts a message to the given channel. + */ + PUBLISH(RedisDataType.PUBSUB), + + /** + * Adds the specified members with the specified score to the sorted set stored at key. + */ + ZADD(RedisDataType.SORTED_SET), + + /** + * Sets field in the hash stored at key to value. If key does not exist, + * a new key holding a hash is created. If field already exists in the hash, it is overwritten. + */ + HSET(RedisDataType.HASH); + + /** + * The {@link RedisDataType} this command belongs to. + */ + private RedisDataType redisDataType; + + RedisCommand(RedisDataType redisDataType) { + this.redisDataType = redisDataType; + } + + + /** + * The {@link RedisDataType} this command belongs to. + * @return the {@link RedisDataType} + */ + public RedisDataType getRedisDataType(){ + return redisDataType; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java new file mode 100644 index 0000000..6ab329f --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import java.io.Serializable; +import java.util.Objects; + +/** + * The description of the command type. This must be passed while creating new {@link RedisMapper}. + * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}, + * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}. + * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException} + * + * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} + * you can use second constructor {@link #RedisCommandDescription(RedisCommand)} + */ +public class RedisCommandDescription implements Serializable { + + private static final long serialVersionUID = 1L; + + private RedisCommand redisCommand; + + /** + * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. + * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. + * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. + * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH} + * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score. + * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET} + */ + private String additionalKey; + + /** + * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}. + * If different data type is specified, {@code additionalKey} is ignored. + * @param redisCommand the redis command type {@link RedisCommand} + * @param additionalKey additional key for Hash and Sorted set data type + */ + public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) { + Objects.requireNonNull(redisCommand, "Redis command type can not be null"); + this.redisCommand = redisCommand; + this.additionalKey = additionalKey; + + if (redisCommand.getRedisDataType() == RedisDataType.HASH || + redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) { + if (additionalKey == null) { + throw new IllegalArgumentException("Hash and Sorted Set should have additional key"); + } + } + } + + /** + * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}. + * + * @param redisCommand the redis data type {@link RedisCommand} + */ + public RedisCommandDescription(RedisCommand redisCommand) { + this(redisCommand, null); + } + + /** + * Returns the {@link RedisCommand}. + * + * @return the command type of the mapping + */ + public RedisCommand getCommand() { + return redisCommand; + } + + /** + * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * + * @return the additional key + */ + public String getAdditionalKey() { + return additionalKey; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java new file mode 100644 index 0000000..989221c --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available data type for Redis. + */ +public enum RedisDataType { + + /** + * Strings are the most basic kind of Redis value. Redis Strings are binary safe, + * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object. + * A String value can be at max 512 Megabytes in length. + */ + STRING, + + /** + * Redis Hashes are maps between string fields and string values. + */ + HASH, + + /** + * Redis Lists are simply lists of strings, sorted by insertion order. + */ + LIST, + + /** + * Redis Sets are an unordered collection of Strings. + */ + SET, + + /** + * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings. + * The difference is that every member of a Sorted Set is associated with score, + * that is used in order to take the sorted set ordered, from the smallest to the greatest score. + * While members are unique, scores may be repeated. + */ + SORTED_SET, + + /** + * HyperLogLog is a probabilistic data structure used in order to count unique things. + */ + HYPER_LOG_LOG, + + /** + * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels, + * without knowledge of what (if any) subscribers there may be. + * Subscribers express interest in one or more channels, and only receive messages + * that are of interest, without knowledge of what (if any) publishers there are. + */ + PUBSUB +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java new file mode 100644 index 0000000..b2580a7 --- /dev/null +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * Function that creates the description how the input data should be mapped to redis type. + *<p>Example: + *<pre>{@code + *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> { + * public RedisDataTypeDescription getCommandDescription() { + * return new RedisDataTypeDescription(RedisCommand.PUBLISH); + * } + * public String getKeyFromData(Tuple2<String, String> data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2<String, String> data) { + * return data.f1; + * } + *} + *}</pre> + * + * @param <T> The type of the element handled by this {@code RedisMapper} + */ +public interface RedisMapper<T> extends Function, Serializable { + + /** + * Returns descriptor which defines data type. + * + * @return data type descriptor + */ + RedisCommandDescription getCommandDescription(); + + /** + * Extracts key from data. + * + * @param data source data + * @return key + */ + String getKeyFromData(T data); + + /** + * Extracts value from data. + * + * @param data source data + * @return value + */ + String getValueFromData(T data); +}
