http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..dc5396a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+       /**
+        * Initialize the {@link RedisCommandsContainer} based on the instance 
type.
+        * @param flinkJedisConfigBase configuration base
+        * @return @throws IllegalArgumentException if jedisPoolConfig, 
jedisClusterConfig and jedisSentinelConfig are all null
+     */
+       public static RedisCommandsContainer build(FlinkJedisConfigBase 
flinkJedisConfigBase){
+               if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
+                       FlinkJedisPoolConfig flinkJedisPoolConfig = 
(FlinkJedisPoolConfig) flinkJedisConfigBase;
+                       return 
RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
+               } else if (flinkJedisConfigBase instanceof 
FlinkJedisClusterConfig) {
+                       FlinkJedisClusterConfig flinkJedisClusterConfig = 
(FlinkJedisClusterConfig) flinkJedisConfigBase;
+                       return 
RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
+               } else if (flinkJedisConfigBase instanceof 
FlinkJedisSentinelConfig) {
+                       FlinkJedisSentinelConfig flinkJedisSentinelConfig = 
(FlinkJedisSentinelConfig) flinkJedisConfigBase;
+                       return 
RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
+               } else {
+                       throw new IllegalArgumentException("Jedis configuration 
not found");
+               }
+       }
+
+       /**
+        * Builds container for single Redis environment.
+        *
+        * @param jedisPoolConfig configuration for JedisPool
+        * @return container for single Redis environment
+        * @throws NullPointerException if jedisPoolConfig is null
+        */
+       public static RedisCommandsContainer build(FlinkJedisPoolConfig 
jedisPoolConfig) {
+               Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config 
should not be Null");
+
+               GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+               
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+               
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+               
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+               JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
+                       jedisPoolConfig.getPort(), 
jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+                       jedisPoolConfig.getDatabase());
+               return new RedisContainer(jedisPool);
+       }
+
+       /**
+        * Builds container for Redis Cluster environment.
+        *
+        * @param jedisClusterConfig configuration for JedisCluster
+        * @return container for Redis Cluster environment
+        * @throws NullPointerException if jedisClusterConfig is null
+        */
+       public static RedisCommandsContainer build(FlinkJedisClusterConfig 
jedisClusterConfig) {
+               Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster 
config should not be Null");
+
+               GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+               
genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
+               
genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
+               
genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
+
+               JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(), 
jedisClusterConfig.getConnectionTimeout(),
+                       jedisClusterConfig.getMaxRedirections(), 
genericObjectPoolConfig);
+               return new RedisClusterContainer(jedisCluster);
+       }
+
+       /**
+        * Builds container for Redis Sentinel environment.
+        *
+        * @param jedisSentinelConfig configuration for JedisSentinel
+        * @return container for Redis sentinel environment
+        * @throws NullPointerException if jedisSentinelConfig is null
+        */
+       public static RedisCommandsContainer build(FlinkJedisSentinelConfig 
jedisSentinelConfig) {
+               Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel 
config should not be Null");
+
+               GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+               
genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
+               
genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
+               
genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
+
+               JedisSentinelPool jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+                       jedisSentinelConfig.getSentinels(), 
genericObjectPoolConfig,
+                       jedisSentinelConfig.getConnectionTimeout(), 
jedisSentinelConfig.getSoTimeout(),
+                       jedisSentinelConfig.getPassword(), 
jedisSentinelConfig.getDatabase());
+               return new RedisContainer(jedisSentinelPool);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
new file mode 100644
index 0000000..ba4bbda
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server or 
to Redis sentinels
+ * If want to connect to a single Redis server, please use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, please use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+       private transient JedisPool jedisPool;
+       private transient JedisSentinelPool jedisSentinelPool;
+
+       /**
+        * Use this constructor if to connect with single Redis server.
+        *
+        * @param jedisPool JedisPool which actually manages Jedis instances
+        */
+       public RedisContainer(JedisPool jedisPool) {
+               Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+               this.jedisPool = jedisPool;
+               this.jedisSentinelPool = null;
+       }
+
+       /**
+        * Use this constructor if Redis environment is clustered with 
sentinels.
+        *
+        * @param sentinelPool SentinelPool which actually manages Jedis 
instances
+        */
+       public RedisContainer(final JedisSentinelPool sentinelPool) {
+               Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+               this.jedisPool = null;
+               this.jedisSentinelPool = sentinelPool;
+       }
+
+       /**
+        * Closes the Jedis instances.
+        */
+       @Override
+       public void close() throws IOException {
+               if (this.jedisPool != null) {
+                       this.jedisPool.close();
+               }
+               if (this.jedisSentinelPool != null) {
+                       this.jedisSentinelPool.close();
+               }
+       }
+
+       @Override
+       public void open() throws Exception {
+
+               // echo() tries to open a connection and echos back the
+               // message passed as argument. Here we use it to monitor
+               // if we can communicate with the cluster.
+
+               getInstance().echo("Test");
+       }
+
+       @Override
+       public void hset(final String key, final String hashField, final String 
value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.hset(key, hashField, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command HSET to key {} and hashField {} error message {}",
+                                       key, hashField, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void rpush(final String listName, final String value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.rpush(listName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message {}",
+                                       listName, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void lpush(String listName, String value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.lpush(listName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command LUSH to list {} error message {}",
+                                       listName, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void sadd(final String setName, final String value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.sadd(setName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command RPUSH to set {} error message {}",
+                                       setName, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void publish(final String channelName, final String message) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.publish(channelName, message);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command PUBLISH to channel {} error message {}",
+                                       channelName, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void set(final String key, final String value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.set(key, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command SET to key {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void pfadd(final String key, final String element) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.pfadd(key, element);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command PFADD to key {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void zadd(final String key, final String score, final String 
element) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.zadd(key, Double.valueOf(score), element);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command ZADD to set {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       /**
+        * Returns Jedis instance from the pool.
+        *
+        * @return the Jedis instance
+     */
+       private Jedis getInstance() {
+               if (jedisSentinelPool != null) {
+                       return jedisSentinelPool.getResource();
+               } else {
+                       return jedisPool.getResource();
+               }
+       }
+
+       /**
+        * Closes the jedis instance after finishing the command.
+        *
+        * @param jedis The jedis instance
+     */
+       private void releaseInstance(final Jedis jedis) {
+               if (jedis == null) {
+                       return;
+               }
+               try {
+                       jedis.close();
+               } catch (Exception e) {
+                       LOG.error("Failed to close (return) instance to pool", 
e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
new file mode 100644
index 0000000..b0661c7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
+
+       /**
+        * Insert the specified value at the head of the list stored at key.
+        * If key does not exist, it is created as empty list before performing 
the push operations.
+        */
+       LPUSH(RedisDataType.LIST),
+
+       /**
+        * Insert the specified value at the tail of the list stored at key.
+        * If key does not exist, it is created as empty list before performing 
the push operation.
+        */
+       RPUSH(RedisDataType.LIST),
+
+       /**
+        * Add the specified member to the set stored at key.
+        * Specified member that is already a member of this set is ignored.
+        */
+       SADD(RedisDataType.SET),
+
+       /**
+        * Set key to hold the string value. If key already holds a value,
+        * it is overwritten, regardless of its type.
+        */
+       SET(RedisDataType.STRING),
+
+       /**
+        * Adds the element to the HyperLogLog data structure stored at the 
variable name specified as first argument.
+        */
+       PFADD(RedisDataType.HYPER_LOG_LOG),
+
+       /**
+        * Posts a message to the given channel.
+        */
+       PUBLISH(RedisDataType.PUBSUB),
+
+       /**
+        * Adds the specified members with the specified score to the sorted 
set stored at key.
+        */
+       ZADD(RedisDataType.SORTED_SET),
+
+       /**
+        * Sets field in the hash stored at key to value. If key does not exist,
+        * a new key holding a hash is created. If field already exists in the 
hash, it is overwritten.
+        */
+       HSET(RedisDataType.HASH);
+
+       /**
+        * The {@link RedisDataType} this command belongs to.
+        */
+       private RedisDataType redisDataType;
+
+       RedisCommand(RedisDataType redisDataType) {
+               this.redisDataType = redisDataType;
+       }
+
+
+       /**
+        * The {@link RedisDataType} this command belongs to.
+        * @return the {@link RedisDataType}
+        */
+       public RedisDataType getRedisDataType(){
+               return redisDataType;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
new file mode 100644
index 0000000..1eea48a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * The description of the command type. This must be passed while creating new 
{@link RedisMapper}.
+ * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and 
{@link RedisDataType#SORTED_SET},
+ * you need to use first constructor {@link 
#RedisCommandDescription(RedisCommand, String)}.
+ * If the {@code additionalKey} is {@code null} it will throw {@code 
IllegalArgumentException}
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} 
and {@link RedisDataType#SORTED_SET}
+ * you can use second constructor {@link 
#RedisCommandDescription(RedisCommand)}
+ */
+public class RedisCommandDescription implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private RedisCommand redisCommand;
+
+       /**
+        * This additional key is needed for the group {@link 
RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+        * Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+        * But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+        * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+        * {@link #getAdditionalKey()} used as hash name for {@link 
RedisDataType#HASH}
+        * <p>For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+        * {@link #getAdditionalKey()} used as set name for {@link 
RedisDataType#SORTED_SET}
+        */
+       private String additionalKey;
+
+       /**
+        * Use this constructor when data type is {@link RedisDataType#HASH} or 
{@link RedisDataType#SORTED_SET}.
+        * If different data type is specified, {@code additionalKey} is 
ignored.
+        * @param redisCommand the redis command type {@link RedisCommand}
+        * @param additionalKey additional key for Hash and Sorted set data type
+        */
+       public RedisCommandDescription(RedisCommand redisCommand, String 
additionalKey) {
+               Preconditions.checkNotNull(redisCommand, "Redis command type 
can not be null");
+               this.redisCommand = redisCommand;
+               this.additionalKey = additionalKey;
+
+               if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
+                       redisCommand.getRedisDataType() == 
RedisDataType.SORTED_SET) {
+                       if (additionalKey == null) {
+                               throw new IllegalArgumentException("Hash and 
Sorted Set should have additional key");
+                       }
+               }
+       }
+
+       /**
+        * Use this constructor when command type is not in group {@link 
RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+        *
+        * @param redisCommand the redis data type {@link RedisCommand}
+        */
+       public RedisCommandDescription(RedisCommand redisCommand) {
+               this(redisCommand, null);
+       }
+
+       /**
+        * Returns the {@link RedisCommand}.
+        *
+        * @return the command type of the mapping
+        */
+       public RedisCommand getCommand() {
+               return redisCommand;
+       }
+
+       /**
+        * Returns the additional key if data type is {@link 
RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+        *
+        * @return the additional key
+        */
+       public String getAdditionalKey() {
+               return additionalKey;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
new file mode 100644
index 0000000..6e3997c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available data type for Redis.
+ */
+public enum RedisDataType {
+
+       /**
+        * Strings are the most basic kind of Redis value. Redis Strings are 
binary safe,
+        * this means that a Redis string can contain any kind of data, for 
instance a JPEG image or a serialized Ruby object.
+        * A String value can be at max 512 Megabytes in length.
+        */
+       STRING,
+
+       /**
+        * Redis Hashes are maps between string fields and string values.
+        */
+       HASH,
+
+       /**
+        * Redis Lists are simply lists of strings, sorted by insertion order.
+        */
+       LIST,
+
+       /**
+        * Redis Sets are an unordered collection of Strings.
+        */
+       SET,
+
+       /**
+        * Redis Sorted Sets are, similarly to Redis Sets, non repeating 
collections of Strings.
+        * The difference is that every member of a Sorted Set is associated 
with score,
+        * that is used in order to take the sorted set ordered, from the 
smallest to the greatest score.
+        * While members are unique, scores may be repeated.
+        */
+       SORTED_SET,
+
+       /**
+        * HyperLogLog is a probabilistic data structure used in order to count 
unique things.
+        */
+       HYPER_LOG_LOG,
+
+       /**
+        * Redis implementation of publish and subscribe paradigm. Published 
messages are characterized into channels,
+        * without knowledge of what (if any) subscribers there may be.
+        * Subscribers express interest in one or more channels, and only 
receive messages
+        * that are of interest, without knowledge of what (if any) publishers 
there are.
+        */
+       PUBSUB
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..63fed19
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be mapped 
to redis type.
+ *<p>Example:
+ *<pre>{@code
+ *private static class RedisTestMapper implements RedisMapper<Tuple2<String, 
String>> {
+ *    public RedisDataTypeDescription getCommandDescription() {
+ *        return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ *    }
+ *    public String getKeyFromData(Tuple2<String, String> data) {
+ *        return data.f0;
+ *    }
+ *    public String getValueFromData(Tuple2<String, String> data) {
+ *        return data.f1;
+ *    }
+ *}
+ *}</pre>
+ *
+ * @param <T> The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper<T> extends Function, Serializable {
+
+       /**
+        * Returns descriptor which defines data type.
+        *
+        * @return data type descriptor
+        */
+       RedisCommandDescription getCommandDescription();
+
+       /**
+        * Extracts key from data.
+        *
+        * @param data source data
+        * @return key
+        */
+       String getKeyFromData(T data);
+
+       /**
+        * Extracts value from data.
+        *
+        * @param data source data
+        * @return value
+        */
+       String getValueFromData(T data);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
new file mode 100644
index 0000000..7d98f2d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import redis.embedded.RedisServer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public abstract class RedisITCaseBase extends 
StreamingMultipleProgramsTestBase {
+
+       public static final int REDIS_PORT = getAvailablePort();
+       public static final String REDIS_HOST = "127.0.0.1";
+
+       private static RedisServer redisServer;
+
+       @BeforeClass
+       public static void createRedisServer() throws IOException, 
InterruptedException {
+               redisServer = new RedisServer(REDIS_PORT);
+               redisServer.start();
+       }
+
+       @AfterClass
+       public static void stopRedisServer(){
+               redisServer.stop();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
new file mode 100644
index 0000000..dc59ba4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest extends TestLogger {
+
+       private static RedisCluster cluster;
+       private static final String REDIS_MASTER = "master";
+       private static final String TEST_KEY = "testKey";
+       private static final String TEST_VALUE = "testValue";
+       private static final List<Integer> sentinels = 
Arrays.asList(getAvailablePort(), getAvailablePort());
+       private static final List<Integer> group1 = 
Arrays.asList(getAvailablePort(), getAvailablePort());
+
+       private JedisSentinelPool jedisSentinelPool;
+       private FlinkJedisSentinelConfig jedisSentinelConfig;
+
+       @BeforeClass
+       public static void setUpCluster(){
+               cluster = 
RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
+                       .serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
+                       .build();
+               cluster.start();
+       }
+
+       @Before
+       public void setUp() {
+               Set<String> hosts = JedisUtil.sentinelHosts(cluster);
+               jedisSentinelConfig = new 
FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
+                       .setSentinels(hosts).build();
+               jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+                       jedisSentinelConfig.getSentinels());
+       }
+
+       @Test
+       public void testRedisSentinelOperation() {
+               RedisCommandsContainer redisContainer = 
RedisCommandsContainerBuilder.build(jedisSentinelConfig);
+               Jedis jedis = null;
+               try{
+                       jedis = jedisSentinelPool.getResource();
+                       redisContainer.set(TEST_KEY, TEST_VALUE);
+                       assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
+               }finally {
+                       if (jedis != null){
+                               jedis.close();
+                       }
+               }
+       }
+
+       @After
+       public void tearDown() throws IOException {
+               if (jedisSentinelPool != null) {
+                       jedisSentinelPool.close();
+               }
+       }
+
+       @AfterClass
+       public static void tearDownCluster() throws IOException {
+               if (!cluster.isActive()) {
+                       cluster.stop();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
new file mode 100644
index 0000000..21f3cca
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkITCase extends RedisITCaseBase {
+
+       private FlinkJedisPoolConfig jedisPoolConfig;
+       private static final Long NUM_ELEMENTS = 20L;
+       private static final String REDIS_KEY = "TEST_KEY";
+       private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+       StreamExecutionEnvironment env;
+
+
+       private Jedis jedis;
+
+       @Before
+       public void setUp(){
+               jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+                       .setHost(REDIS_HOST)
+                       .setPort(REDIS_PORT).build();
+               jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+               env = StreamExecutionEnvironment.getExecutionEnvironment();
+       }
+
+       @Test
+       public void testRedisListDataType() throws Exception {
+               DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+               RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+                       new RedisCommandMapper(RedisCommand.LPUSH));
+
+               source.addSink(redisSink);
+               env.execute("Test Redis List Data Type");
+
+               assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+               jedis.del(REDIS_KEY);
+       }
+
+       @Test
+       public void testRedisSetDataType() throws Exception {
+               DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+               RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+                       new RedisCommandMapper(RedisCommand.SADD));
+
+               source.addSink(redisSink);
+               env.execute("Test Redis Set Data Type");
+
+               assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+               jedis.del(REDIS_KEY);
+       }
+
+       @Test
+       public void testRedisHyperLogLogDataType() throws Exception {
+               DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+               RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+                       new RedisCommandMapper(RedisCommand.PFADD));
+
+               source.addSink(redisSink);
+               env.execute("Test Redis Hyper Log Log Data Type");
+
+               assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+               jedis.del(REDIS_KEY);
+       }
+
+       @Test
+       public void testRedisSortedSetDataType() throws Exception {
+               DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunctionSortedSet());
+               RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+                       new RedisAdditionalDataMapper(RedisCommand.ZADD));
+
+               source.addSink(redisSink);
+               env.execute("Test Redis Sorted Set Data Type");
+
+               assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+
+               jedis.del(REDIS_ADDITIONAL_KEY);
+       }
+
+       @Test
+       public void testRedisHashDataType() throws Exception {
+               DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunctionHash());
+               RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig,
+                       new RedisAdditionalDataMapper(RedisCommand.HSET));
+
+               source.addSink(redisSink);
+               env.execute("Test Redis Hash Data Type");
+
+               assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+
+               jedis.del(REDIS_ADDITIONAL_KEY);
+       }
+
+       @After
+       public void tearDown(){
+               if(jedis != null){
+                       jedis.close();
+               }
+       }
+
+       private static class TestSourceFunction implements 
SourceFunction<Tuple2<String, String>> {
+               private static final long serialVersionUID = 1L;
+
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<Tuple2<String, String>> ctx) 
throws Exception {
+                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                               ctx.collect(new Tuple2<>(REDIS_KEY, "message #" 
+ i));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       private static class TestSourceFunctionHash implements 
SourceFunction<Tuple2<String, String>> {
+               private static final long serialVersionUID = 1L;
+
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<Tuple2<String, String>> ctx) 
throws Exception {
+                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                               ctx.collect(new Tuple2<>("" + i, "message #" + 
i));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       private static class TestSourceFunctionSortedSet implements 
SourceFunction<Tuple2<String, String>> {
+               private static final long serialVersionUID = 1L;
+
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<Tuple2<String, String>> ctx) 
throws Exception {
+                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                               ctx.collect(new Tuple2<>( "message #" + i, "" + 
i));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       public static class RedisCommandMapper implements 
RedisMapper<Tuple2<String, String>>{
+
+               private RedisCommand redisCommand;
+
+               public RedisCommandMapper(RedisCommand redisCommand){
+                       this.redisCommand = redisCommand;
+               }
+
+               @Override
+               public RedisCommandDescription getCommandDescription() {
+                       return new RedisCommandDescription(redisCommand);
+               }
+
+               @Override
+               public String getKeyFromData(Tuple2<String, String> data) {
+                       return data.f0;
+               }
+
+               @Override
+               public String getValueFromData(Tuple2<String, String> data) {
+                       return data.f1;
+               }
+       }
+
+       public static class RedisAdditionalDataMapper implements 
RedisMapper<Tuple2<String, String>>{
+
+               private RedisCommand redisCommand;
+
+               public RedisAdditionalDataMapper(RedisCommand redisCommand){
+                       this.redisCommand = redisCommand;
+               }
+
+               @Override
+               public RedisCommandDescription getCommandDescription() {
+                       return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+               }
+
+               @Override
+               public String getKeyFromData(Tuple2<String, String> data) {
+                       return data.f0;
+               }
+
+               @Override
+               public String getValueFromData(Tuple2<String, String> data) {
+                       return data.f1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
new file mode 100644
index 0000000..caf3945
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishITCase extends RedisITCaseBase {
+
+       private static final int NUM_ELEMENTS = 20;
+       private static final String REDIS_CHANNEL = "CHANNEL";
+
+       private static final List<String> sourceList = new ArrayList<>();
+       private Thread sinkThread;
+       private PubSub pubSub;
+
+       @Before
+       public void before() throws Exception {
+               pubSub = new PubSub();
+               sinkThread = new Thread(new Subscribe(pubSub));
+       }
+
+       @Test
+       public void redisSinkTest() throws Exception {
+               sinkThread.start();
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               FlinkJedisPoolConfig jedisPoolConfig = new 
FlinkJedisPoolConfig.Builder()
+                       .setHost(REDIS_HOST)
+                       .setPort(REDIS_PORT).build();
+               DataStreamSource<Tuple2<String, String>> source = 
env.addSource(new TestSourceFunction());
+
+               RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(jedisPoolConfig, new RedisTestMapper());
+
+               source.addSink(redisSink);
+
+               env.execute("Redis Sink Test");
+
+               assertEquals(NUM_ELEMENTS, sourceList.size());
+       }
+
+       @After
+       public void after() throws Exception {
+               pubSub.unsubscribe();
+               sinkThread.join();
+               sourceList.clear();
+       }
+
+       private class Subscribe implements Runnable {
+               private PubSub localPubSub;
+               private Subscribe(PubSub pubSub){
+                       this.localPubSub = pubSub;
+               }
+
+               @Override
+               public void run() {
+                       JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+                       pool.getResource().subscribe(localPubSub, 
REDIS_CHANNEL);
+               }
+       }
+
+       private static class TestSourceFunction implements 
SourceFunction<Tuple2<String, String>> {
+               private static final long serialVersionUID = 1L;
+
+               private volatile boolean running = true;
+
+               @Override
+               public void run(SourceContext<Tuple2<String, String>> ctx) 
throws Exception {
+                       for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                               ctx.collect(new Tuple2<>(REDIS_CHANNEL, 
"message #" + i));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+
+       public static class PubSub extends JedisPubSub {
+
+               @Override
+               public void onMessage(String channel, String message) {
+                       sourceList.add(message);
+               }
+
+       }
+
+       private static class RedisTestMapper implements 
RedisMapper<Tuple2<String, String>>{
+
+               @Override
+               public RedisCommandDescription getCommandDescription() {
+                       return new 
RedisCommandDescription(RedisCommand.PUBLISH);
+               }
+
+               @Override
+               public String getKeyFromData(Tuple2<String, String> data) {
+                       return data.f0;
+               }
+
+               @Override
+               public String getValueFromData(Tuple2<String, String> data) {
+                       return data.f1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
new file mode 100644
index 0000000..59f59f2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+
+public class RedisSinkTest extends TestLogger {
+
+       @Test(expected=NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfDataMapperIsNull(){
+               new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), 
null);
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointerExceptionIfCommandDescriptionIsNull(){
+               new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), 
new TestMapper(null));
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){
+               new RedisSink<>(null, new TestMapper(new 
RedisCommandDescription(RedisCommand.LPUSH)));
+       }
+
+       @Test
+       public void testRedisDownBehavior() throws Exception {
+
+               // create a wrong configuration so that open() fails.
+
+               FlinkJedisPoolConfig wrongJedisPoolConfig = new 
FlinkJedisPoolConfig.Builder()
+                       .setHost("127.0.0.1")
+                       .setPort(1234).build();
+
+               testDownBehavior(wrongJedisPoolConfig);
+       }
+
+       @Test
+       public void testRedisClusterDownBehavior() throws Exception {
+
+               Set<InetSocketAddress> hosts = new HashSet<>();
+               hosts.add(new InetSocketAddress("127.0.0.1", 1234));
+
+               // create a wrong configuration so that open() fails.
+
+               FlinkJedisClusterConfig wrongJedisClusterConfig = new 
FlinkJedisClusterConfig.Builder()
+                       .setNodes(hosts)
+                       .setTimeout(100)
+                       .setMaxIdle(1)
+                       .setMaxTotal(1)
+                       .setMinIdle(1).build();
+
+               testDownBehavior(wrongJedisClusterConfig);
+       }
+
+       @Test
+       public void testRedisSentinelDownBehavior() throws Exception {
+
+               Set<String> hosts = new HashSet<>();
+               hosts.add("localhost:55095");
+
+               // create a wrong configuration so that open() fails.
+
+               FlinkJedisSentinelConfig wrongJedisSentinelConfig = new 
FlinkJedisSentinelConfig.Builder()
+                       .setMasterName("master")
+                       .setSentinels(hosts)
+                       .build();
+
+               testDownBehavior(wrongJedisSentinelConfig);
+       }
+
+       private void testDownBehavior(FlinkJedisConfigBase config) throws 
Exception {
+               RedisSink<Tuple2<String, String>> redisSink = new 
RedisSink<>(config,
+                       new 
RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
+
+               try {
+                       redisSink.open(new Configuration());
+               } catch (Exception e) {
+
+                       // search for nested JedisConnectionExceptions
+                       // because this is the expected behavior
+
+                       Throwable t = e;
+                       int depth = 0;
+                       while (!(t instanceof JedisConnectionException)) {
+                               t = t.getCause();
+                               if (t == null || depth++ == 20) {
+                                       throw e;
+                               }
+                       }
+               }
+       }
+
+       private class TestMapper implements RedisMapper<Tuple2<String, String>>{
+               private RedisCommandDescription redisCommandDescription;
+
+               public TestMapper(RedisCommandDescription 
redisCommandDescription){
+                       this.redisCommandDescription = redisCommandDescription;
+               }
+               @Override
+               public RedisCommandDescription getCommandDescription() {
+                       return redisCommandDescription;
+               }
+
+               @Override
+               public String getKeyFromData(Tuple2<String, String> data) {
+                       return data.f0;
+               }
+
+               @Override
+               public String getValueFromData(Tuple2<String, String> data) {
+                       return data.f1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
new file mode 100644
index 0000000..ed1d713
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class FlinkJedisConfigBaseTest extends TestLogger {
+
+       @Test(expected = IllegalArgumentException.class)
+       public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
+               new TestConfig(-1, 0, 0, 0);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
+               new TestConfig(1, -1, 0, 0);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
+               new TestConfig(0, 0, -1, 0);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
+               new TestConfig(0, 0, 0, -1);
+       }
+
+       private class TestConfig extends FlinkJedisConfigBase{
+
+               protected TestConfig(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle) {
+                       super(connectionTimeout, maxTotal, maxIdle, minIdle);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
new file mode 100644
index 0000000..40db578
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfigTest extends TestLogger {
+
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
+               FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+               builder.setMinIdle(0)
+                       .setMaxIdle(0)
+                       .setMaxTotal(0)
+                       .setTimeout(0)
+                       .build();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void shouldThrowIllegalArgumentExceptionIfNodeValuesAreEmpty(){
+               Set<InetSocketAddress> set = new HashSet<>();
+               FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+               builder.setMinIdle(0)
+                       .setMaxIdle(0)
+                       .setMaxTotal(0)
+                       .setTimeout(0)
+                       .setNodes(set)
+                       .build();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
new file mode 100644
index 0000000..dc16cfe
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class JedisPoolConfigTest extends TestLogger {
+
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfHostValueIsNull(){
+               FlinkJedisPoolConfig.Builder builder = new 
FlinkJedisPoolConfig.Builder();
+               builder.build();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
new file mode 100644
index 0000000..8445fae
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisSentinelConfigTest extends TestLogger {
+
+       public static final String MASTER_NAME = "test-master";
+
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfMasterValueIsNull(){
+               FlinkJedisSentinelConfig.Builder builder = new 
FlinkJedisSentinelConfig.Builder();
+               Set<String> sentinels = new HashSet<>();
+               sentinels.add("127.0.0.1");
+               builder.setSentinels(sentinels).build();
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfSentinelsValueIsNull(){
+               FlinkJedisSentinelConfig.Builder builder = new 
FlinkJedisSentinelConfig.Builder();
+               builder.setMasterName(MASTER_NAME).build();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void shouldThrowNullPointExceptionIfSentinelsValueIsEmpty(){
+               FlinkJedisSentinelConfig.Builder builder = new 
FlinkJedisSentinelConfig.Builder();
+               Set<String> sentinels = new HashSet<>();
+               
builder.setMasterName(MASTER_NAME).setSentinels(sentinels).build();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
new file mode 100644
index 0000000..b0eee48
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.RedisSinkITCase;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisDataTypeDescriptionTest extends TestLogger {
+
+       @Test(expected=IllegalArgumentException.class)
+       public void 
shouldThrowExceptionIfAdditionalKeyIsNotGivenForHashDataType(){
+               RedisSinkITCase.RedisCommandMapper redisCommandMapper = new 
RedisSinkITCase.RedisCommandMapper(RedisCommand.HSET);
+               redisCommandMapper.getCommandDescription();
+       }
+
+       @Test
+       public void shouldReturnNullForAdditionalDataType(){
+               RedisSinkITCase.RedisCommandMapper redisCommandMapper = new 
RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);
+               RedisCommandDescription redisDataTypeDescription = 
redisCommandMapper.getCommandDescription();
+               assertEquals(RedisDataType.LIST, 
redisDataTypeDescription.getCommand().getRedisDataType());
+               assertNull(redisDataTypeDescription.getAdditionalKey());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-twitter/pom.xml 
b/flink-connectors/flink-connector-twitter/pom.xml
new file mode 100644
index 0000000..27a966f
--- /dev/null
+++ b/flink-connectors/flink-connector-twitter/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-twitter_2.10</artifactId>
+       <name>flink-connector-twitter</name>
+
+       <packaging>jar</packaging>
+
+       <properties>
+               <hbc-core.version>2.2.0</hbc-core.version>
+       </properties>
+
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.twitter</groupId>
+                       <artifactId>hbc-core</artifactId>
+                       <version>${hbc-core.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <!-- Override artifactSet configuration to 
build fat-jar with all dependencies packed. -->
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <configuration>
+                                                       <artifactSet>
+                                                               <includes 
combine.children="append">
+                                                                       <!-- We 
include all dependencies that transitively depend on guava -->
+                                                                       
<include>com.twitter:hbc-core</include>
+                                                                       
<include>com.twitter:joauth</include>
+                                                                       
<include>org.apache.httpcomponents:httpclient</include>
+                                                                       
<include>org.apache.httpcomponents:httpcore</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
 
b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..66fa237
--- /dev/null
+++ 
b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Properties;
+
+import com.twitter.hbc.common.DelimitedStreamReader;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. This is not a parallel source because the Twitter API only allows
+ * two concurrent connections.
+ */
+public class TwitterSource extends RichSourceFunction<String> implements 
StoppableFunction {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TwitterSource.class);
+
+       private static final long serialVersionUID = 1L;
+
+       // ----- Required property keys
+
+       public static final String CONSUMER_KEY = "twitter-source.consumerKey";
+
+       public static final String CONSUMER_SECRET = 
"twitter-source.consumerSecret";
+
+       public static final String TOKEN = "twitter-source.token";
+
+       public static final String TOKEN_SECRET = "twitter-source.tokenSecret";
+
+       // ------ Optional property keys
+
+       public static final String CLIENT_NAME = "twitter-source.name";
+
+       public static final String CLIENT_HOSTS = "twitter-source.hosts";
+
+       public static final String CLIENT_BUFFER_SIZE = 
"twitter-source.bufferSize";
+
+       // ----- Fields set by the constructor
+
+       private final Properties properties;
+
+       private EndpointInitializer initializer = new SampleStatusesEndpoint();
+
+       // ----- Runtime fields
+       private transient BasicClient client;
+       private transient Object waitLock;
+       private transient boolean running = true;
+
+
+       /**
+        * Create {@link TwitterSource} for streaming
+        * 
+        * @param properties For the source
+        */
+       public TwitterSource(Properties properties) {
+               checkProperty(properties, CONSUMER_KEY);
+               checkProperty(properties, CONSUMER_SECRET);
+               checkProperty(properties, TOKEN);
+               checkProperty(properties, TOKEN_SECRET);
+
+               this.properties = properties;
+       }
+
+       private static void checkProperty(Properties p, String key) {
+               if(!p.containsKey(key)) {
+                       throw new IllegalArgumentException("Required property 
'" + key + "' not set.");
+               }
+       }
+
+
+       /**
+        * Set a custom endpoint initializer.
+        */
+       public void setCustomEndpointInitializer(EndpointInitializer 
initializer) {
+               Objects.requireNonNull(initializer, "Initializer has to be 
set");
+               ClosureCleaner.ensureSerializable(initializer);
+               this.initializer = initializer;
+       }
+
+       // ----- Source lifecycle
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               waitLock = new Object();
+       }
+
+
+       @Override
+       public void run(final SourceContext<String> ctx) throws Exception {
+               LOG.info("Initializing Twitter Streaming API connection");
+
+               StreamingEndpoint endpoint = initializer.createEndpoint();
+
+               Authentication auth = new 
OAuth1(properties.getProperty(CONSUMER_KEY),
+                       properties.getProperty(CONSUMER_SECRET),
+                       properties.getProperty(TOKEN),
+                       properties.getProperty(TOKEN_SECRET));
+
+               client = new ClientBuilder()
+                       .name(properties.getProperty(CLIENT_NAME, 
"flink-twitter-source"))
+                       .hosts(properties.getProperty(CLIENT_HOSTS, 
Constants.STREAM_HOST))
+                       .endpoint(endpoint)
+                       .authentication(auth)
+                       .processor(new HosebirdMessageProcessor() {
+                               public DelimitedStreamReader reader;
+
+                               @Override
+                               public void setup(InputStream input) {
+                                       reader = new 
DelimitedStreamReader(input, Constants.DEFAULT_CHARSET, 
Integer.parseInt(properties.getProperty(CLIENT_BUFFER_SIZE, "50000")));
+                               }
+
+                               @Override
+                               public boolean process() throws IOException, 
InterruptedException {
+                                       String line = reader.readLine();
+                                       ctx.collect(line);
+                                       return true;
+                               }
+                       })
+                       .build();
+
+               client.connect();
+               running = true;
+
+               LOG.info("Twitter Streaming API connection established 
successfully");
+
+               // just wait now
+               while(running) {
+                       synchronized (waitLock) {
+                               waitLock.wait(100L);
+                       }
+               }
+       }
+
+       @Override
+       public void close() {
+               this.running = false;
+               LOG.info("Closing source");
+               if (client != null) {
+                       // client seems to be thread-safe
+                       client.stop();
+               }
+               // leave main method
+               synchronized (waitLock) {
+                       waitLock.notify();
+               }
+       }
+
+       @Override
+       public void cancel() {
+               LOG.info("Cancelling Twitter source");
+               close();
+       }
+
+       @Override
+       public void stop() {
+               LOG.info("Stopping Twitter source");
+               close();
+       }
+
+       // ------ Custom endpoints
+
+       /**
+        * Implementing this interface allows users of this source to set a 
custom endpoint.
+        */
+       public interface EndpointInitializer {
+               StreamingEndpoint createEndpoint();
+       }
+
+       /**
+        * Default endpoint initializer returning the {@see 
StatusesSampleEndpoint}.
+        */
+       private static class SampleStatusesEndpoint implements 
EndpointInitializer, Serializable {
+               @Override
+               public StreamingEndpoint createEndpoint() {
+                       // this default endpoint initializer returns the sample 
endpoint: Returning a sample from the firehose (all tweets)
+                       StatusesSampleEndpoint endpoint = new 
StatusesSampleEndpoint();
+                       endpoint.stallWarnings(false);
+                       endpoint.delimited(false);
+                       return endpoint;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml 
b/flink-connectors/flink-hadoop-compatibility/pom.xml
new file mode 100644
index 0000000..5938560
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/pom.xml
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       
+       <modelVersion>4.0.0</modelVersion>
+       
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-hadoop-compatibility_2.10</artifactId>
+       <name>flink-hadoop-compatibility</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-scala_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-hadoop2</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+               
+       </dependencies>
+
+
+       <build>
+               <plugins>
+                       <!-- activate API compatibility checks -->
+                       <plugin>
+                               <groupId>com.github.siom79.japicmp</groupId>
+                               <artifactId>japicmp-maven-plugin</artifactId>
+                       </plugin>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Scala Code Style, most of the configuration done 
via plugin management -->
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <configuration>
+                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
new file mode 100644
index 0000000..7bcb4bf
--- /dev/null
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
+import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+import org.apache.hadoop.io.Writable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Type information for data types that extend Hadoop's {@link Writable} 
interface. The Writable
+ * interface defines the serialization and deserialization routines for the 
data type.
+ *
+ * @param <T> The type of the class represented by this type information.
+ */
+@Public
+public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> 
implements AtomicType<T> {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private final Class<T> typeClass;
+
+       @PublicEvolving
+       public WritableTypeInfo(Class<T> typeClass) {
+               this.typeClass = checkNotNull(typeClass);
+
+               checkArgument(
+                       Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class),
+                       "WritableTypeInfo can only be used for subclasses of 
%s", Writable.class.getName());
+       }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Override
+       @PublicEvolving
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
+               if(Comparable.class.isAssignableFrom(typeClass)) {
+                       return new WritableComparator(sortOrderAscending, 
typeClass);
+               }
+               else {
+                       throw new UnsupportedOperationException("Cannot create 
Comparator for "+typeClass.getCanonicalName()+". " +
+                                                                               
                        "Class does not implement Comparable interface.");
+               }
+       }
+
+       @Override
+       @PublicEvolving
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       @PublicEvolving
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       @PublicEvolving
+       public int getArity() {
+               return 1;
+       }
+       
+       @Override
+       @PublicEvolving
+       public int getTotalFields() {
+               return 1;
+       }
+
+       @Override
+       @PublicEvolving
+       public Class<T> getTypeClass() {
+               return this.typeClass;
+       }
+
+       @Override
+       @PublicEvolving
+       public boolean isKeyType() {
+               return Comparable.class.isAssignableFrom(typeClass);
+       }
+
+       @Override
+       @PublicEvolving
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
+               return new WritableSerializer<T>(typeClass);
+       }
+       
+       @Override
+       public String toString() {
+               return "WritableType<" + typeClass.getName() + ">";
+       }       
+       
+       @Override
+       public int hashCode() {
+               return typeClass.hashCode();
+       }
+       
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof WritableTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       WritableTypeInfo<T> writableTypeInfo = 
(WritableTypeInfo<T>) obj;
+
+                       return writableTypeInfo.canEqual(this) &&
+                               typeClass == writableTypeInfo.typeClass;
+
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof WritableTypeInfo;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       @PublicEvolving
+       static <T extends Writable> TypeInformation<T> 
getWritableTypeInfo(Class<T> typeClass) {
+               if (Writable.class.isAssignableFrom(typeClass) && 
!typeClass.equals(Writable.class)) {
+                       return new WritableTypeInfo<T>(typeClass);
+               }
+               else {
+                       throw new InvalidTypesException("The given class is no 
subclass of " + Writable.class.getName());
+               }
+       }
+       
+}

Reply via email to