http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
new file mode 100644
index 0000000..72bac1c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, Integer,
+ * Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, 
String, String, Integer, Boolean,
+ * Boolean, Integer, Integer, Integer, Integer)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+       private String host;
+       private Integer port;
+       private String virtualHost;
+       private String username;
+       private String password;
+       private String uri;
+
+       private Integer networkRecoveryInterval;
+       private Boolean automaticRecovery;
+       private Boolean topologyRecovery;
+
+       private Integer connectionTimeout;
+       private Integer requestedChannelMax;
+       private Integer requestedFrameMax;
+       private Integer requestedHeartbeat;
+
+       /**
+       *
+       * @param host host name
+       * @param port port
+       * @param virtualHost virtual host
+       * @param username username
+       * @param password password
+       * @param networkRecoveryInterval connection recovery interval in 
milliseconds
+       * @param automaticRecovery if automatic connection recovery
+       * @param topologyRecovery if topology recovery
+       * @param connectionTimeout connection timeout
+       * @param requestedChannelMax requested maximum channel number
+       * @param requestedFrameMax requested maximum frame size
+       * @param requestedHeartbeat requested heartbeat interval
+       * @throws NullPointerException if host or virtual host or username or 
password is null
+       */
+       private RMQConnectionConfig(String host, Integer port, String 
virtualHost, String username, String password,
+                                                               Integer 
networkRecoveryInterval, Boolean automaticRecovery,
+                                                               Boolean 
topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+                                                               Integer 
requestedFrameMax, Integer requestedHeartbeat){
+               Preconditions.checkNotNull(host, "host can not be null");
+               Preconditions.checkNotNull(port, "port can not be null");
+               Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+               Preconditions.checkNotNull(username, "username can not be 
null");
+               Preconditions.checkNotNull(password, "password can not be 
null");
+               this.host = host;
+               this.port = port;
+               this.virtualHost = virtualHost;
+               this.username = username;
+               this.password = password;
+
+               this.networkRecoveryInterval = networkRecoveryInterval;
+               this.automaticRecovery = automaticRecovery;
+               this.topologyRecovery = topologyRecovery;
+               this.connectionTimeout = connectionTimeout;
+               this.requestedChannelMax = requestedChannelMax;
+               this.requestedFrameMax = requestedFrameMax;
+               this.requestedHeartbeat = requestedHeartbeat;
+       }
+
+       /**
+       *
+       * @param uri the connection URI
+       * @param networkRecoveryInterval connection recovery interval in 
milliseconds
+       * @param automaticRecovery if automatic connection recovery
+       * @param topologyRecovery if topology recovery
+       * @param connectionTimeout connection timeout
+       * @param requestedChannelMax requested maximum channel number
+       * @param requestedFrameMax requested maximum frame size
+       * @param requestedHeartbeat requested heartbeat interval
+       * @throws NullPointerException if URI is null
+       */
+       private RMQConnectionConfig(String uri, Integer 
networkRecoveryInterval, Boolean automaticRecovery,
+                                                               Boolean 
topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax,
+                                                               Integer 
requestedFrameMax, Integer requestedHeartbeat){
+               Preconditions.checkNotNull(uri, "Uri can not be null");
+               this.uri = uri;
+
+               this.networkRecoveryInterval = networkRecoveryInterval;
+               this.automaticRecovery = automaticRecovery;
+               this.topologyRecovery = topologyRecovery;
+               this.connectionTimeout = connectionTimeout;
+               this.requestedChannelMax = requestedChannelMax;
+               this.requestedFrameMax = requestedFrameMax;
+               this.requestedHeartbeat = requestedHeartbeat;
+       }
+
+       /** @return the host to use for connections */
+       public String getHost() {
+               return host;
+       }
+
+       /** @return the port to use for connections */
+       public int getPort() {
+               return port;
+       }
+
+       /**
+        * Retrieve the virtual host.
+        * @return the virtual host to use when connecting to the broker
+        */
+       public String getVirtualHost() {
+               return virtualHost;
+       }
+
+       /**
+        * Retrieve the user name.
+        * @return the AMQP user name to use when connecting to the broker
+        */
+       public String getUsername() {
+               return username;
+       }
+
+       /**
+        * Retrieve the password.
+        * @return the password to use when connecting to the broker
+        */
+       public String getPassword() {
+               return password;
+       }
+
+       /**
+        * Retrieve the URI.
+        * @return the connection URI when connecting to the broker
+        */
+       public String getUri() {
+               return uri;
+       }
+
+       /**
+        * Returns automatic connection recovery interval in milliseconds.
+        * @return how long will automatic recovery wait before attempting to 
reconnect, in ms; default is 5000
+        */
+       public Integer getNetworkRecoveryInterval() {
+               return networkRecoveryInterval;
+       }
+
+       /**
+        * Returns true if automatic connection recovery is enabled, false 
otherwise
+        * @return true if automatic connection recovery is enabled, false 
otherwise
+        */
+       public Boolean isAutomaticRecovery() {
+               return automaticRecovery;
+       }
+
+       /**
+        * Returns true if topology recovery is enabled, false otherwise
+        * @return true if topology recovery is enabled, false otherwise
+        */
+       public Boolean isTopologyRecovery() {
+               return topologyRecovery;
+       }
+
+       /**
+        * Retrieve the connection timeout.
+        * @return the connection timeout, in milliseconds; zero for infinite
+        */
+       public Integer getConnectionTimeout() {
+               return connectionTimeout;
+       }
+
+       /**
+        * Retrieve the requested maximum channel number
+        * @return the initially requested maximum channel number; zero for 
unlimited
+        */
+       public Integer getRequestedChannelMax() {
+               return requestedChannelMax;
+       }
+
+       /**
+        * Retrieve the requested maximum frame size
+        * @return the initially requested maximum frame size, in octets; zero 
for unlimited
+        */
+       public Integer getRequestedFrameMax() {
+               return requestedFrameMax;
+       }
+
+       /**
+        * Retrieve the requested heartbeat interval.
+        * @return the initially requested heartbeat interval, in seconds; zero 
for none
+        */
+       public Integer getRequestedHeartbeat() {
+               return requestedHeartbeat;
+       }
+
+       /**
+        *
+        * @return Connection Factory for RMQ
+        * @throws URISyntaxException, NoSuchAlgorithmException, 
KeyManagementException if Malformed URI has been passed
+        */
+       public ConnectionFactory getConnectionFactory() throws 
URISyntaxException,
+               NoSuchAlgorithmException, KeyManagementException {
+               ConnectionFactory factory = new ConnectionFactory();
+               if (this.uri != null && !this.uri.isEmpty()){
+                       try {
+                               factory.setUri(this.uri);
+                       } catch (URISyntaxException | NoSuchAlgorithmException 
| KeyManagementException e) {
+                               LOG.error("Failed to parse uri", e);
+                               throw e;
+                       }
+               } else {
+                       factory.setHost(this.host);
+                       factory.setPort(this.port);
+                       factory.setVirtualHost(this.virtualHost);
+                       factory.setUsername(this.username);
+                       factory.setPassword(this.password);
+               }
+
+               if (this.automaticRecovery != null) {
+                       
factory.setAutomaticRecoveryEnabled(this.automaticRecovery);
+               }
+               if (this.connectionTimeout != null) {
+                       factory.setConnectionTimeout(this.connectionTimeout);
+               }
+               if (this.networkRecoveryInterval != null) {
+                       
factory.setNetworkRecoveryInterval(this.networkRecoveryInterval);
+               }
+               if (this.requestedHeartbeat != null) {
+                       factory.setRequestedHeartbeat(this.requestedHeartbeat);
+               }
+               if (this.topologyRecovery != null) {
+                       
factory.setTopologyRecoveryEnabled(this.topologyRecovery);
+               }
+               if (this.requestedChannelMax != null) {
+                       
factory.setRequestedChannelMax(this.requestedChannelMax);
+               }
+               if (this.requestedFrameMax != null) {
+                       factory.setRequestedFrameMax(this.requestedFrameMax);
+               }
+
+               return factory;
+       }
+
+       /**
+        * The Builder Class for {@link RMQConnectionConfig}
+        */
+       public static class Builder {
+
+               private String host;
+               private Integer port;
+               private String virtualHost;
+               private String username;
+               private String password;
+
+               private Integer networkRecoveryInterval;
+               private Boolean automaticRecovery;
+               private Boolean topologyRecovery;
+
+               private Integer connectionTimeout;
+               private Integer requestedChannelMax;
+               private Integer requestedFrameMax;
+               private Integer requestedHeartbeat;
+
+               private String uri;
+
+               /**
+                * Set the target port.
+                * @param port the default port to use for connections
+                * @return the Builder
+                */
+               public Builder setPort(int port) {
+                       this.port = port;
+                       return this;
+               }
+
+               /** @param host the default host to use for connections
+                * @return the Builder
+                */
+               public Builder setHost(String host) {
+                       this.host = host;
+                       return this;
+               }
+
+               /**
+                * Set the virtual host.
+                * @param virtualHost the virtual host to use when connecting 
to the broker
+                * @return the Builder
+                */
+               public Builder setVirtualHost(String virtualHost) {
+                       this.virtualHost = virtualHost;
+                       return this;
+               }
+
+               /**
+                * Set the user name.
+                * @param username the AMQP user name to use when connecting to 
the broker
+                * @return the Builder
+                */
+               public Builder setUserName(String username) {
+                       this.username = username;
+                       return this;
+               }
+
+               /**
+                * Set the password.
+                * @param password the password to use when connecting to the 
broker
+                * @return the Builder
+                */
+               public Builder setPassword(String password) {
+                       this.password = password;
+                       return this;
+               }
+
+               /**
+                * Convenience method for setting the fields in an AMQP URI: 
host,
+                * port, username, password and virtual host.  If any part of 
the
+                * URI is ommited, the ConnectionFactory's corresponding 
variable
+                * is left unchanged.
+                * @param uri is the AMQP URI containing the data
+                * @return the Builder
+                */
+               public Builder setUri(String uri) {
+                       this.uri = uri;
+                       return this;
+               }
+
+               /**
+                * Enables or disables topology recovery
+                * @param topologyRecovery if true, enables topology recovery
+                * @return the Builder
+                */
+               public Builder setTopologyRecoveryEnabled(boolean 
topologyRecovery) {
+                       this.topologyRecovery = topologyRecovery;
+                       return this;
+               }
+
+               /**
+                * Set the requested heartbeat.
+                * @param requestedHeartbeat the initially requested heartbeat 
interval, in seconds; zero for none
+                * @return the Builder
+                */
+               public Builder setRequestedHeartbeat(int requestedHeartbeat) {
+                       this.requestedHeartbeat = requestedHeartbeat;
+                       return this;
+               }
+
+               /**
+                * Set the requested maximum frame size
+                * @param requestedFrameMax initially requested maximum frame 
size, in octets; zero for unlimited
+                * @return the Builder
+                */
+               public Builder setRequestedFrameMax(int requestedFrameMax) {
+                       this.requestedFrameMax = requestedFrameMax;
+                       return this;
+               }
+
+               /**
+                * Set the requested maximum channel number
+                * @param requestedChannelMax initially requested maximum 
channel number; zero for unlimited
+                */
+               public Builder setRequestedChannelMax(int requestedChannelMax) {
+                       this.requestedChannelMax = requestedChannelMax;
+                       return this;
+               }
+
+               /**
+                * Sets connection recovery interval. Default is 5000.
+                * @param networkRecoveryInterval how long will automatic 
recovery wait before attempting to reconnect, in ms
+                * @return the Builder
+                */
+               public Builder setNetworkRecoveryInterval(int 
networkRecoveryInterval) {
+                       this.networkRecoveryInterval = networkRecoveryInterval;
+                       return this;
+               }
+
+               /**
+                * Set the connection timeout.
+                * @param connectionTimeout connection establishment timeout in 
milliseconds; zero for infinite
+                * @return the Builder
+                */
+               public Builder setConnectionTimeout(int connectionTimeout) {
+                       this.connectionTimeout = connectionTimeout;
+                       return this;
+               }
+
+               /**
+                * Enables or disables automatic connection recovery
+                * @param automaticRecovery if true, enables connection recovery
+                * @return the Builder
+                */
+               public Builder setAutomaticRecovery(boolean automaticRecovery) {
+                       this.automaticRecovery = automaticRecovery;
+                       return this;
+               }
+
+               /**
+                * The Builder method
+                * If URI is NULL we use host, port, vHost, username, password 
combination
+                * to initialize connection. using  {@link 
RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String,
+                * Integer, Boolean, Boolean, Integer, Integer, Integer, 
Integer)}
+                *
+                * else URI will be used to initialize the client connection
+                * {@link RMQConnectionConfig#RMQConnectionConfig(String, 
Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+                * @return RMQConnectionConfig
+                */
+               public RMQConnectionConfig build(){
+                       if(this.uri != null) {
+                               return new RMQConnectionConfig(this.uri, 
this.networkRecoveryInterval,
+                                       this.automaticRecovery, 
this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
+                                       this.requestedFrameMax, 
this.requestedHeartbeat);
+                       } else {
+                               return new RMQConnectionConfig(this.host, 
this.port, this.virtualHost, this.username, this.password,
+                                       this.networkRecoveryInterval, 
this.automaticRecovery, this.topologyRecovery,
+                                       this.connectionTimeout, 
this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
new file mode 100644
index 0000000..b63c835
--- /dev/null
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.SerializedCheckpointData;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.modules.junit4.PowerMockRunner;
+import com.rabbitmq.client.Connection;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Tests for the RMQSource. The source supports two operation modes.
+ * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and the 
deduplication mechanism in
+ *    {@link 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}.
+ * 2) At-least-once (when checkpointed) with RabbitMQ transactions but not 
deduplication.
+ * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ 
auto-commit mode.
+ *
+ * This tests assumes that the message ids are increasing monotonously. That 
doesn't have to be the
+ * case. The correlation id is used to uniquely identify messages.
+ */
+@RunWith(PowerMockRunner.class)
+public class RMQSourceTest {
+
+       private RMQSource<String> source;
+
+       private Configuration config = new Configuration();
+
+       private Thread sourceThread;
+
+       private volatile long messageId;
+
+       private boolean generateCorrelationIds;
+
+       private volatile Exception exception;
+
+       @Before
+       public void beforeTest() throws Exception {
+
+               source = new RMQTestSource();
+               source.open(config);
+
+               messageId = 0;
+               generateCorrelationIds = true;
+
+               sourceThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       source.run(new DummySourceContext());
+                               } catch (Exception e) {
+                                       exception = e;
+                               }
+                       }
+               });
+       }
+
+       @After
+       public void afterTest() throws Exception {
+               source.cancel();
+               sourceThread.join();
+       }
+
+       @Test
+       public void throwExceptionIfConnectionFactoryReturnNull() throws 
Exception {
+               RMQConnectionConfig connectionConfig = 
Mockito.mock(RMQConnectionConfig.class);
+               ConnectionFactory connectionFactory = 
Mockito.mock(ConnectionFactory.class);
+               Connection connection = Mockito.mock(Connection.class);
+               
Mockito.when(connectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+               
Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
+               Mockito.when(connection.createChannel()).thenReturn(null);
+
+               RMQSource<String> rmqSource = new RMQSource<>(
+                       connectionConfig, "queueDummy", true, new 
StringDeserializationScheme());
+               try {
+                       rmqSource.open(new Configuration());
+               } catch (RuntimeException ex) {
+                       assertEquals("None of RabbitMQ channels are available", 
ex.getMessage());
+               }
+       }
+
+       @Test
+       public void testCheckpointing() throws Exception {
+               source.autoAck = false;
+               sourceThread.start();
+
+               Thread.sleep(5);
+
+               final Random random = new Random(System.currentTimeMillis());
+               int numSnapshots = 50;
+               long previousSnapshotId;
+               long lastSnapshotId = 0;
+
+               long totalNumberOfAcks = 0;
+
+               for (int i=0; i < numSnapshots; i++) {
+                       long snapshotId = random.nextLong();
+                       SerializedCheckpointData[] data;
+
+                       synchronized (DummySourceContext.lock) {
+                               data = source.snapshotState(snapshotId, 
System.currentTimeMillis());
+                               previousSnapshotId = lastSnapshotId;
+                               lastSnapshotId = messageId;
+                       }
+                       // let some time pass
+                       Thread.sleep(5);
+
+                       // check if the correct number of messages have been 
snapshotted
+                       final long numIds = lastSnapshotId - previousSnapshotId;
+                       assertEquals(numIds, data[0].getNumIds());
+                       // deserialize and check if the last id equals the last 
snapshotted id
+                       ArrayDeque<Tuple2<Long, List<String>>> deque = 
SerializedCheckpointData.toDeque(data, new StringSerializer());
+                       List<String> messageIds = deque.getLast().f1;
+                       if (messageIds.size() > 0) {
+                               assertEquals(lastSnapshotId, (long) 
Long.valueOf(messageIds.get(messageIds.size() - 1)));
+                       }
+
+                       // check if the messages are being acknowledged and the 
transaction comitted
+                       synchronized (DummySourceContext.lock) {
+                               source.notifyCheckpointComplete(snapshotId);
+                       }
+                       totalNumberOfAcks += numIds;
+
+               }
+
+               Mockito.verify(source.channel, Mockito.times((int) 
totalNumberOfAcks)).basicAck(Mockito.anyLong(), Mockito.eq(false));
+               Mockito.verify(source.channel, 
Mockito.times(numSnapshots)).txCommit();
+
+       }
+
+       /**
+        * Checks whether recurring ids are processed again (they shouldn't be).
+        */
+       @Test
+       public void testDuplicateId() throws Exception {
+               source.autoAck = false;
+               sourceThread.start();
+
+               while (messageId < 10) {
+                       // wait until messages have been processed
+                       Thread.sleep(5);
+               }
+
+               long oldMessageId;
+               synchronized (DummySourceContext.lock) {
+                       oldMessageId = messageId;
+                       messageId = 0;
+               }
+
+               while (messageId < 10) {
+                       // process again
+                       Thread.sleep(5);
+               }
+
+               synchronized (DummySourceContext.lock) {
+                       assertEquals(Math.max(messageId, oldMessageId), 
DummySourceContext.numElementsCollected);
+               }
+       }
+
+
+       /**
+        * The source should not acknowledge ids in auto-commit mode or check 
for previously acknowledged ids
+        */
+       @Test
+       public void testCheckpointingDisabled() throws Exception {
+               source.autoAck = true;
+               sourceThread.start();
+
+               while (DummySourceContext.numElementsCollected < 50) {
+                       // wait until messages have been processed
+                       Thread.sleep(5);
+               }
+
+               // see addId in RMQTestSource.addId for the assert
+       }
+
+       /**
+        * Tests error reporting in case of invalid correlation ids
+        */
+       @Test
+       public void testCorrelationIdNotSet() throws InterruptedException {
+               generateCorrelationIds = false;
+               source.autoAck = false;
+               sourceThread.start();
+
+               sourceThread.join();
+
+               assertNotNull(exception);
+               assertTrue(exception instanceof NullPointerException);
+       }
+
+       /**
+        * Tests whether constructor params are passed correctly.
+        */
+       @Test
+       public void testConstructorParams() throws Exception {
+               // verify construction params
+               RMQConnectionConfig.Builder builder = new 
RMQConnectionConfig.Builder();
+               
builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
+               ConstructorTestClass testObj = new ConstructorTestClass(
+                       builder.build(), "queueTest", false, new 
StringDeserializationScheme());
+
+               try {
+                       testObj.open(new Configuration());
+               } catch (Exception e) {
+                       // connection fails but check if args have been passed 
correctly
+               }
+
+               assertEquals("hostTest", testObj.getFactory().getHost());
+               assertEquals(999, testObj.getFactory().getPort());
+               assertEquals("userTest", testObj.getFactory().getUsername());
+               assertEquals("passTest", testObj.getFactory().getPassword());
+       }
+
+       private static class ConstructorTestClass extends RMQSource<String> {
+
+               private ConnectionFactory factory;
+
+               public ConstructorTestClass(RMQConnectionConfig 
rmqConnectionConfig,
+                                                                       String 
queueName,
+                                                                       boolean 
usesCorrelationId,
+                                                                       
DeserializationSchema<String> deserializationSchema) throws Exception {
+                       super(rmqConnectionConfig, queueName, 
usesCorrelationId, deserializationSchema);
+                       RMQConnectionConfig.Builder builder = new 
RMQConnectionConfig.Builder();
+                       
builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/");
+                       factory = 
Mockito.spy(builder.build().getConnectionFactory());
+                       try {
+                               Mockito.doThrow(new 
RuntimeException()).when(factory).newConnection();
+                       } catch (IOException e) {
+                               fail("Failed to stub connection method");
+                       }
+               }
+
+               @Override
+               protected ConnectionFactory setupConnectionFactory() {
+                       return factory;
+               }
+
+               public ConnectionFactory getFactory() {
+                       return factory;
+               }
+       }
+
+       private static class StringDeserializationScheme implements 
DeserializationSchema<String> {
+
+               @Override
+               public String deserialize(byte[] message) throws IOException {
+                       try {
+                               // wait a bit to not cause too much cpu load
+                               Thread.sleep(1);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+                       return new String(message);
+               }
+
+               @Override
+               public boolean isEndOfStream(String nextElement) {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<String> getProducedType() {
+                       return TypeExtractor.getForClass(String.class);
+               }
+       }
+
+       private class RMQTestSource extends RMQSource<String> {
+
+               public RMQTestSource() {
+                       super(new 
RMQConnectionConfig.Builder().setHost("hostTest")
+                                       
.setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build()
+                               , "queueDummy", true, new 
StringDeserializationScheme());
+               }
+
+               @Override
+               public void open(Configuration config) throws Exception {
+                       super.open(config);
+
+                       consumer = Mockito.mock(QueueingConsumer.class);
+
+                       // Mock for delivery
+                       final QueueingConsumer.Delivery deliveryMock = 
Mockito.mock(QueueingConsumer.Delivery.class);
+                       
Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes());
+
+                       try {
+                               
Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock);
+                       } catch (InterruptedException e) {
+                               fail("Couldn't setup up deliveryMock");
+                       }
+
+                       // Mock for envelope
+                       Envelope envelope = Mockito.mock(Envelope.class);
+                       
Mockito.when(deliveryMock.getEnvelope()).thenReturn(envelope);
+
+                       Mockito.when(envelope.getDeliveryTag()).thenAnswer(new 
Answer<Long>() {
+                               @Override
+                               public Long answer(InvocationOnMock invocation) 
throws Throwable {
+                                       return ++messageId;
+                               }
+                       });
+
+                       // Mock for properties
+                       AMQP.BasicProperties props = 
Mockito.mock(AMQP.BasicProperties.class);
+                       
Mockito.when(deliveryMock.getProperties()).thenReturn(props);
+
+                       Mockito.when(props.getCorrelationId()).thenAnswer(new 
Answer<String>() {
+                               @Override
+                               public String answer(InvocationOnMock 
invocation) throws Throwable {
+                                       return generateCorrelationIds ? "" + 
messageId : null;
+                               }
+                       });
+
+               }
+
+               @Override
+               protected ConnectionFactory setupConnectionFactory() {
+                       ConnectionFactory connectionFactory = 
Mockito.mock(ConnectionFactory.class);
+                       Connection connection = Mockito.mock(Connection.class);
+                       try {
+                               
Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
+                               
Mockito.when(connection.createChannel()).thenReturn(Mockito.mock(Channel.class));
+                       } catch (IOException e) {
+                               fail("Test environment couldn't be created.");
+                       }
+                       return connectionFactory;
+               }
+
+               @Override
+               public RuntimeContext getRuntimeContext() {
+                       return Mockito.mock(StreamingRuntimeContext.class);
+               }
+
+               @Override
+               protected boolean addId(String uid) {
+                       assertEquals(false, autoAck);
+                       return super.addId(uid);
+               }
+       }
+
+       private static class DummySourceContext implements 
SourceFunction.SourceContext<String> {
+
+               private static final Object lock = new Object();
+
+               private static long numElementsCollected;
+
+               public DummySourceContext() {
+                       numElementsCollected = 0;
+               }
+
+               @Override
+               public void collect(String element) {
+                       numElementsCollected++;
+               }
+
+               @Override
+               public void collectWithTimestamp(java.lang.String element, long 
timestamp) {
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return lock;
+               }
+
+               @Override
+               public void close() {
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
new file mode 100644
index 0000000..40985ce
--- /dev/null
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class RMQConnectionConfigTest {
+
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfHostIsNull() throws 
NoSuchAlgorithmException,
+               KeyManagementException, URISyntaxException {
+               RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder()
+                       .setPort(1000).setUserName("guest")
+                       .setPassword("guest").setVirtualHost("/").build();
+               connectionConfig.getConnectionFactory();
+       }
+       @Test(expected = NullPointerException.class)
+       public void shouldThrowNullPointExceptionIfPortIsNull() throws 
NoSuchAlgorithmException,
+               KeyManagementException, URISyntaxException {
+               RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder()
+                       .setHost("localhost").setUserName("guest")
+                       .setPassword("guest").setVirtualHost("/").build();
+               connectionConfig.getConnectionFactory();
+       }
+
+       @Test(expected = NullPointerException.class)
+       public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() throws 
NoSuchAlgorithmException,
+               KeyManagementException, URISyntaxException {
+               RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder()
+                       .setHost("localhost").setUserName("guest")
+                       .setPassword("guest").setVirtualHost("/").build();
+               ConnectionFactory factory = 
connectionConfig.getConnectionFactory();
+               assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, 
factory.getConnectionTimeout());
+       }
+
+       @Test
+       public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() throws 
NoSuchAlgorithmException,
+               KeyManagementException, URISyntaxException {
+               RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder()
+                       .setHost("localhost").setPort(5000).setUserName("guest")
+                       .setPassword("guest").setVirtualHost("/")
+                       .setConnectionTimeout(5000).build();
+               ConnectionFactory factory = 
connectionConfig.getConnectionFactory();
+               assertEquals(5000, factory.getConnectionTimeout());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
new file mode 100644
index 0000000..199cd1e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class RMQSinkTest {
+
+       private static final String QUEUE_NAME = "queue";
+       private static final String MESSAGE_STR = "msg";
+       private static final byte[] MESSAGE = new byte[1];
+
+       private RMQConnectionConfig rmqConnectionConfig;
+       private ConnectionFactory connectionFactory;
+       private Connection connection;
+       private Channel channel;
+       private SerializationSchema<String> serializationSchema;
+
+
+       @Before
+       public void before() throws Exception {
+               serializationSchema = spy(new DummySerializationSchema());
+               rmqConnectionConfig = mock(RMQConnectionConfig.class);
+               connectionFactory = mock(ConnectionFactory.class);
+               connection = mock(Connection.class);
+               channel = mock(Channel.class);
+
+               
when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+               when(connectionFactory.newConnection()).thenReturn(connection);
+               when(connection.createChannel()).thenReturn(channel);
+       }
+
+       @Test
+       public void openCallDeclaresQueue() throws Exception {
+               createRMQSink();
+
+               verify(channel).queueDeclare(QUEUE_NAME, false, false, false, 
null);
+       }
+
+       @Test
+       public void throwExceptionIfChannelIsNull() throws Exception {
+               when(connection.createChannel()).thenReturn(null);
+               try {
+                       createRMQSink();
+               } catch (RuntimeException ex) {
+                       assertEquals("None of RabbitMQ channels are available", 
ex.getMessage());
+               }
+       }
+
+       private RMQSink<String> createRMQSink() throws Exception {
+               RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+               rmqSink.open(new Configuration());
+               return rmqSink;
+       }
+
+       @Test
+       public void invokePublishBytesToQueue() throws Exception {
+               RMQSink<String> rmqSink = createRMQSink();
+
+               rmqSink.invoke(MESSAGE_STR);
+               verify(serializationSchema).serialize(MESSAGE_STR);
+               verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+       }
+
+       @Test(expected = RuntimeException.class)
+       public void exceptionDuringPublishingIsNotIgnored() throws Exception {
+               RMQSink<String> rmqSink = createRMQSink();
+
+               doThrow(IOException.class).when(channel).basicPublish("", 
QUEUE_NAME, null, MESSAGE);
+               rmqSink.invoke("msg");
+       }
+
+       @Test
+       public void exceptionDuringPublishingIsIgnoredIfLogFailuresOnly() 
throws Exception {
+               RMQSink<String> rmqSink = createRMQSink();
+               rmqSink.setLogFailuresOnly(true);
+
+               doThrow(IOException.class).when(channel).basicPublish("", 
QUEUE_NAME, null, MESSAGE);
+               rmqSink.invoke("msg");
+       }
+
+       @Test
+       public void closeAllResources() throws Exception {
+               RMQSink<String> rmqSink = createRMQSink();
+
+               rmqSink.close();
+
+               verify(channel).close();
+               verify(connection).close();
+       }
+
+       private class DummySerializationSchema implements 
SerializationSchema<String> {
+               @Override
+               public byte[] serialize(String element) {
+                       return MESSAGE;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/pom.xml 
b/flink-connectors/flink-connector-redis/pom.xml
new file mode 100644
index 0000000..a348f31
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-redis_2.10</artifactId>
+       <name>flink-connector-redis</name>
+
+       <packaging>jar</packaging>
+
+       <properties>
+               <jedis.version>2.8.0</jedis.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>redis.clients</groupId>
+                       <artifactId>jedis</artifactId>
+                       <version>${jedis.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.github.kstyrc</groupId>
+                       <artifactId>embedded-redis</artifactId>
+                       <version>0.6</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+    
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
new file mode 100644
index 0000000..f6b0fd7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link 
RedisMapper}.
+ * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, 
the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when 
you want to connect to Sentinel.
+ * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you 
want to connect to
+ * a Redis Cluster.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ *{@code
+ *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, 
String>> {
+ *
+ *     private RedisCommand redisCommand;
+ *
+ *     public RedisExampleMapper(RedisCommand redisCommand){
+ *             this.redisCommand = redisCommand;
+ *     }
+ *     public RedisCommandDescription getCommandDescription() {
+ *             return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ *     }
+ *     public String getKeyFromData(Tuple2<String, String> data) {
+ *             return data.f0;
+ *     }
+ *     public String getValueFromData(Tuple2<String, String> data) {
+ *             return data.f1;
+ *     }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *    .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink<String>(jedisPoolConfig, new 
RedisExampleMapper(RedisCommand.LPUSH));
+ *}</pre>
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class RedisSink<IN> extends RichSinkFunction<IN> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+       /**
+        * This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+        * Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+        * But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+        * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+        * {@code additionalKey} used as hash name for {@link 
RedisDataType#HASH}
+        * <p>For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+        * {@code additionalKey} used as set name for {@link 
RedisDataType#SORTED_SET}
+        */
+       private String additionalKey;
+       private RedisMapper<IN> redisSinkMapper;
+       private RedisCommand redisCommand;
+
+       private FlinkJedisConfigBase flinkJedisConfigBase;
+       private RedisCommandsContainer redisCommandsContainer;
+
+       /**
+        * Creates a new {@link RedisSink} that connects to the Redis server.
+        *
+        * @param flinkJedisConfigBase The configuration of {@link 
FlinkJedisConfigBase}
+        * @param redisSinkMapper This is used to generate Redis command and 
key value from incoming elements.
+        */
+       public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, 
RedisMapper<IN> redisSinkMapper) {
+               Preconditions.checkNotNull(flinkJedisConfigBase, "Redis 
connection pool config should not be null");
+               Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can 
not be null");
+               
Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis 
Mapper data type description can not be null");
+
+               this.flinkJedisConfigBase = flinkJedisConfigBase;
+
+               this.redisSinkMapper = redisSinkMapper;
+               RedisCommandDescription redisCommandDescription = 
redisSinkMapper.getCommandDescription();
+               this.redisCommand = redisCommandDescription.getCommand();
+               this.additionalKey = redisCommandDescription.getAdditionalKey();
+       }
+
+       /**
+        * Called when new data arrives to the sink, and forwards it to Redis 
channel.
+        * Depending on the specified Redis data type (see {@link 
RedisDataType}),
+        * a different Redis command will be applied.
+        * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, 
HSET, ZADD.
+        *
+        * @param input The incoming data
+        */
+       @Override
+       public void invoke(IN input) throws Exception {
+               String key = redisSinkMapper.getKeyFromData(input);
+               String value = redisSinkMapper.getValueFromData(input);
+
+               switch (redisCommand) {
+                       case RPUSH:
+                               this.redisCommandsContainer.rpush(key, value);
+                               break;
+                       case LPUSH:
+                               this.redisCommandsContainer.lpush(key, value);
+                               break;
+                       case SADD:
+                               this.redisCommandsContainer.sadd(key, value);
+                               break;
+                       case SET:
+                               this.redisCommandsContainer.set(key, value);
+                               break;
+                       case PFADD:
+                               this.redisCommandsContainer.pfadd(key, value);
+                               break;
+                       case PUBLISH:
+                               this.redisCommandsContainer.publish(key, value);
+                               break;
+                       case ZADD:
+                               
this.redisCommandsContainer.zadd(this.additionalKey, value, key);
+                               break;
+                       case HSET:
+                               
this.redisCommandsContainer.hset(this.additionalKey, key, value);
+                               break;
+                       default:
+                               throw new IllegalArgumentException("Cannot 
process such data type: " + redisCommand);
+               }
+       }
+
+       /**
+        * Initializes the connection to Redis by either cluster or sentinels 
or single server.
+        *
+        * @throws IllegalArgumentException if jedisPoolConfig, 
jedisClusterConfig and jedisSentinelConfig are all null
+     */
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               try {
+                       this.redisCommandsContainer = 
RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+                       this.redisCommandsContainer.open();
+               } catch (Exception e) {
+                       LOG.error("Redis has not been properly initialized: ", 
e);
+                       throw e;
+               }
+       }
+
+       /**
+        * Closes commands container.
+        * @throws IOException if command container is unable to close.
+        */
+       @Override
+       public void close() throws IOException {
+               if (redisCommandsContainer != null) {
+                       redisCommandsContainer.close();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
new file mode 100644
index 0000000..6e6cfe5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+       private static final long serialVersionUID = 1L;
+
+       private final Set<InetSocketAddress> nodes;
+       private final int maxRedirections;
+
+
+       /**
+        * Jedis cluster configuration.
+        * The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+        *
+        * @param nodes list of node information for JedisCluster
+        * @param connectionTimeout socket / connection timeout. The default is 
2000
+        * @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+        * @param maxTotal the maximum number of objects that can be allocated 
by the pool
+        * @param maxIdle the cap on the number of "idle" instances in the pool
+        * @param minIdle the minimum number of idle objects to maintain in the 
pool
+        * @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+        */
+       private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int 
connectionTimeout, int maxRedirections,
+                                                                       int 
maxTotal, int maxIdle, int minIdle) {
+               super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+               Preconditions.checkNotNull(nodes, "Node information should be 
presented");
+               Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster 
hosts should not be empty");
+               this.nodes = new HashSet<>(nodes);
+               this.maxRedirections = maxRedirections;
+       }
+
+
+
+       /**
+        * Returns nodes.
+        *
+        * @return list of node information
+        */
+       public Set<HostAndPort> getNodes() {
+               Set<HostAndPort> ret = new HashSet<>();
+               for (InetSocketAddress node : nodes) {
+                       ret.add(new HostAndPort(node.getHostName(), 
node.getPort()));
+               }
+               return ret;
+       }
+
+       /**
+        * Returns limit of redirection.
+        *
+        * @return limit of redirection
+        */
+       public int getMaxRedirections() {
+               return maxRedirections;
+       }
+
+
+       /**
+        * Builder for initializing  {@link FlinkJedisClusterConfig}.
+        */
+       public static class Builder {
+               private Set<InetSocketAddress> nodes;
+               private int timeout = Protocol.DEFAULT_TIMEOUT;
+               private int maxRedirections = 5;
+               private int maxTotal = 
GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+               private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+               private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+               /**
+                * Sets list of node.
+                *
+                * @param nodes list of node
+                * @return Builder itself
+                */
+               public Builder setNodes(Set<InetSocketAddress> nodes) {
+                       this.nodes = nodes;
+                       return this;
+               }
+
+               /**
+                * Sets socket / connection timeout.
+                *
+                * @param timeout socket / connection timeout, default value is 
2000
+                * @return Builder itself
+                */
+               public Builder setTimeout(int timeout) {
+                       this.timeout = timeout;
+                       return this;
+               }
+
+               /**
+                * Sets limit of redirection.
+                *
+                * @param maxRedirections limit of redirection, default value 
is 5
+                * @return Builder itself
+                */
+               public Builder setMaxRedirections(int maxRedirections) {
+                       this.maxRedirections = maxRedirections;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxTotal} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxTotal maxTotal the maximum number of objects that 
can be allocated by the pool, default value is 8
+                * @return Builder itself
+                */
+               public Builder setMaxTotal(int maxTotal) {
+                       this.maxTotal = maxTotal;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxIdle the cap on the number of "idle" instances in 
the pool, default value is 8
+                * @return Builder itself
+                */
+               public Builder setMaxIdle(int maxIdle) {
+                       this.maxIdle = maxIdle;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code minIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param minIdle the minimum number of idle objects to 
maintain in the pool, default value is 0
+                * @return Builder itself
+                */
+               public Builder setMinIdle(int minIdle) {
+                       this.minIdle = minIdle;
+                       return this;
+               }
+
+               /**
+                * Builds JedisClusterConfig.
+                *
+                * @return JedisClusterConfig
+                */
+               public FlinkJedisClusterConfig build() {
+                       return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle);
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "JedisClusterConfig{" +
+                       "nodes=" + nodes +
+                       ", timeout=" + connectionTimeout +
+                       ", maxRedirections=" + maxRedirections +
+                       ", maxTotal=" + maxTotal +
+                       ", maxIdle=" + maxIdle +
+                       ", minIdle=" + minIdle +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
new file mode 100644
index 0000000..a2489b8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       protected final int maxTotal;
+       protected final int maxIdle;
+       protected final int minIdle;
+       protected final int connectionTimeout;
+
+       protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle){
+               Preconditions.checkArgument(connectionTimeout >= 0, "connection 
timeout can not be negative");
+               Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can 
not be negative");
+               Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can 
not be negative");
+               Preconditions.checkArgument(minIdle >= 0, "minIdle value can 
not be negative");
+               this.connectionTimeout = connectionTimeout;
+               this.maxTotal = maxTotal;
+               this.maxIdle = maxIdle;
+               this.minIdle = minIdle;
+       }
+
+       /**
+        * Returns timeout.
+        *
+        * @return connection timeout
+        */
+       public int getConnectionTimeout() {
+               return connectionTimeout;
+       }
+
+       /**
+        * Get the value for the {@code maxTotal} configuration attribute
+        * for pools to be created with this configuration instance.
+        *
+        * @return  The current setting of {@code maxTotal} for this
+        *          configuration instance
+        * @see GenericObjectPoolConfig#getMaxTotal()
+        */
+       public int getMaxTotal() {
+               return maxTotal;
+       }
+
+       /**
+        * Get the value for the {@code maxIdle} configuration attribute
+        * for pools to be created with this configuration instance.
+        *
+        * @return  The current setting of {@code maxIdle} for this
+        *          configuration instance
+        * @see GenericObjectPoolConfig#getMaxIdle()
+        */
+       public int getMaxIdle() {
+               return maxIdle;
+       }
+
+       /**
+        * Get the value for the {@code minIdle} configuration attribute
+        * for pools to be created with this configuration instance.
+        *
+        * @return  The current setting of {@code minIdle} for this
+        *          configuration instance
+        * @see GenericObjectPoolConfig#getMinIdle()
+        */
+       public int getMinIdle() {
+               return minIdle;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
new file mode 100644
index 0000000..d261a35
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+/**
+ * Configuration for Jedis pool.
+ */
+public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
+
+       private static final long serialVersionUID = 1L;
+
+       private final String host;
+       private final int port;
+       private final int database;
+       private final String password;
+
+
+       /**
+        * Jedis pool configuration.
+        * The host is mandatory, and when host is not set, it throws 
NullPointerException.
+        *
+        * @param host hostname or IP
+        * @param port port, default value is 6379
+        * @param connectionTimeout socket / connection timeout, default value 
is 2000 milli second
+        * @param password password, if any
+        * @param database database index
+        * @param maxTotal the maximum number of objects that can be allocated 
by the pool, default value is 8
+        * @param maxIdle the cap on the number of "idle" instances in the 
pool, default value is 8
+        * @param minIdle the minimum number of idle objects to maintain in the 
pool, default value is 0
+        * @throws NullPointerException if parameter {@code host} is {@code 
null}
+        */
+       private FlinkJedisPoolConfig(String host, int port, int 
connectionTimeout, String password, int database,
+                                                               int maxTotal, 
int maxIdle, int minIdle) {
+               super(connectionTimeout, maxTotal, maxIdle, minIdle);
+               Preconditions.checkNotNull(host, "Host information should be 
presented");
+               this.host = host;
+               this.port = port;
+               this.database = database;
+               this.password = password;
+       }
+
+       /**
+        * Returns host.
+        *
+        * @return hostname or IP
+        */
+       public String getHost() {
+               return host;
+       }
+
+       /**
+        * Returns port.
+        *
+        * @return port
+        */
+       public int getPort() {
+               return port;
+       }
+
+
+       /**
+        * Returns database index.
+        *
+        * @return database index
+        */
+       public int getDatabase() {
+               return database;
+       }
+
+       /**
+        * Returns password.
+        *
+        * @return password
+        */
+       public String getPassword() {
+               return password;
+       }
+
+       /**
+        * Builder for initializing  {@link FlinkJedisPoolConfig}.
+        */
+       public static class Builder {
+               private String host;
+               private int port = Protocol.DEFAULT_PORT;
+               private int timeout = Protocol.DEFAULT_TIMEOUT;
+               private int database = Protocol.DEFAULT_DATABASE;
+               private String password;
+               private int maxTotal = 
GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+               private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+               private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+               /**
+                * Sets value for the {@code maxTotal} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxTotal maxTotal the maximum number of objects that 
can be allocated by the pool, default value is 8
+         * @return Builder itself
+         */
+               public Builder setMaxTotal(int maxTotal) {
+                       this.maxTotal = maxTotal;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxIdle the cap on the number of "idle" instances in 
the pool, default value is 8
+         * @return Builder itself
+         */
+               public Builder setMaxIdle(int maxIdle) {
+                       this.maxIdle = maxIdle;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code minIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param minIdle the minimum number of idle objects to 
maintain in the pool, default value is 0
+         * @return Builder itself
+         */
+               public Builder setMinIdle(int minIdle) {
+                       this.minIdle = minIdle;
+                       return this;
+               }
+
+               /**
+                * Sets host.
+                *
+                * @param host host
+                * @return Builder itself
+                */
+               public Builder setHost(String host) {
+                       this.host = host;
+                       return this;
+               }
+
+               /**
+                * Sets port.
+                *
+                * @param port port, default value is 6379
+                * @return Builder itself
+                */
+               public Builder setPort(int port) {
+                       this.port = port;
+                       return this;
+               }
+
+               /**
+                * Sets timeout.
+                *
+                * @param timeout timeout, default value is 2000
+                * @return Builder itself
+                */
+               public Builder setTimeout(int timeout) {
+                       this.timeout = timeout;
+                       return this;
+               }
+
+               /**
+                * Sets database index.
+                *
+                * @param database database index, default value is 0
+                * @return Builder itself
+                */
+               public Builder setDatabase(int database) {
+                       this.database = database;
+                       return this;
+               }
+
+               /**
+                * Sets password.
+                *
+                * @param password password, if any
+                * @return Builder itself
+                */
+               public Builder setPassword(String password) {
+                       this.password = password;
+                       return this;
+               }
+
+
+               /**
+                * Builds JedisPoolConfig.
+                *
+                * @return JedisPoolConfig
+                */
+               public FlinkJedisPoolConfig build() {
+                       return new FlinkJedisPoolConfig(host, port, timeout, 
password, database, maxTotal, maxIdle, minIdle);
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "JedisPoolConfig{" +
+                       "host='" + host + '\'' +
+                       ", port=" + port +
+                       ", timeout=" + connectionTimeout +
+                       ", database=" + database +
+                       ", maxTotal=" + maxTotal +
+                       ", maxIdle=" + maxIdle +
+                       ", minIdle=" + minIdle +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
new file mode 100644
index 0000000..2cdb397
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+       private final String masterName;
+       private final Set<String> sentinels;
+       private final int soTimeout;
+       private final String password;
+       private final int database;
+
+       /**
+        * Jedis Sentinels config.
+        * The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+        *
+        * @param masterName master name of the replica set
+        * @param sentinels set of sentinel hosts
+        * @param connectionTimeout timeout connection timeout
+        * @param soTimeout timeout socket timeout
+        * @param password password, if any
+        * @param database database database index
+        * @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+        * @param maxIdle the cap on the number of "idle" instances in the pool
+        * @param minIdle the minimum number of idle objects to maintain in the 
pool
+        *
+        * @throws NullPointerException if {@code masterName} or {@code 
sentinels} is {@code null}
+        * @throws IllegalArgumentException if {@code sentinels} are empty
+        */
+       private FlinkJedisSentinelConfig(String masterName, Set<String> 
sentinels,
+                                                                       int 
connectionTimeout, int soTimeout,
+                                                                       String 
password, int database,
+                                                                       int 
maxTotal, int maxIdle, int minIdle) {
+               super(connectionTimeout, maxTotal, maxIdle, minIdle);
+               Preconditions.checkNotNull(masterName, "Master name should be 
presented");
+               Preconditions.checkNotNull(sentinels, "Sentinels information 
should be presented");
+               Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel 
hosts should not be empty");
+
+               this.masterName = masterName;
+               this.sentinels = new HashSet<>(sentinels);
+               this.soTimeout = soTimeout;
+               this.password = password;
+               this.database = database;
+       }
+
+       /**
+        * Returns master name of the replica set.
+        *
+        * @return master name of the replica set.
+        */
+       public String getMasterName() {
+               return masterName;
+       }
+
+       /**
+        * Returns Sentinels host addresses.
+        *
+        * @return Set of Sentinels host addresses
+        */
+       public Set<String> getSentinels() {
+               return sentinels;
+       }
+
+       /**
+        * Returns socket timeout.
+        *
+        * @return socket timeout
+        */
+       public int getSoTimeout() {
+               return soTimeout;
+       }
+
+       /**
+        * Returns password.
+        *
+        * @return password
+        */
+       public String getPassword() {
+               return password;
+       }
+
+       /**
+        * Returns database index.
+        *
+        * @return database index
+        */
+       public int getDatabase() {
+               return database;
+       }
+
+       /**
+        * Builder for initializing {@link FlinkJedisSentinelConfig}.
+        */
+       public static class Builder {
+               private String masterName;
+               private Set<String> sentinels;
+               private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
+               private int soTimeout = Protocol.DEFAULT_TIMEOUT;
+               private String password;
+               private int database = Protocol.DEFAULT_DATABASE;
+               private int maxTotal = 
GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+               private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+               private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+               /**
+                * Sets master name of the replica set.
+                *
+                * @param masterName  master name of the replica set
+                * @return Builder itself
+         */
+               public Builder setMasterName(String masterName) {
+                       this.masterName = masterName;
+                       return this;
+               }
+
+               /**
+                * Sets sentinels address.
+                *
+                * @param sentinels host set of the sentinels
+                * @return Builder itself
+         */
+               public Builder setSentinels(Set<String> sentinels) {
+                       this.sentinels = sentinels;
+                       return this;
+               }
+
+               /**
+                * Sets connection timeout.
+                *
+                * @param connectionTimeout connection timeout, default value 
is 2000
+                * @return Builder itself
+                */
+               public Builder setConnectionTimeout(int connectionTimeout) {
+                       this.connectionTimeout = connectionTimeout;
+                       return this;
+               }
+
+               /**
+                * Sets socket timeout.
+                *
+                * @param soTimeout socket timeout, default value is 2000
+         * @return Builder itself
+         */
+               public Builder setSoTimeout(int soTimeout) {
+                       this.soTimeout = soTimeout;
+                       return this;
+               }
+
+               /**
+                * Sets password.
+                *
+                * @param password password, if any
+                * @return Builder itself
+                */
+               public Builder setPassword(String password) {
+                       this.password = password;
+                       return this;
+               }
+
+               /**
+                * Sets database index.
+                *
+                * @param database database index, default value is 0
+                * @return Builder itself
+                */
+               public Builder setDatabase(int database) {
+                       this.database = database;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxTotal} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxTotal maxTotal the maximum number of objects that 
can be allocated by the pool, default value is 8
+                * @return Builder itself
+                */
+               public Builder setMaxTotal(int maxTotal) {
+                       this.maxTotal = maxTotal;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxIdle the cap on the number of "idle" instances in 
the pool, default value is 8
+                * @return Builder itself
+                */
+               public Builder setMaxIdle(int maxIdle) {
+                       this.maxIdle = maxIdle;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code minIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param minIdle the minimum number of idle objects to 
maintain in the pool, default value is 0
+                * @return Builder itself
+                */
+               public Builder setMinIdle(int minIdle) {
+                       this.minIdle = minIdle;
+                       return this;
+               }
+
+               /**
+                * Builds JedisSentinelConfig.
+                *
+                * @return JedisSentinelConfig
+                */
+               public FlinkJedisSentinelConfig build(){
+                       return new FlinkJedisSentinelConfig(masterName, 
sentinels, connectionTimeout, soTimeout,
+                               password, database, maxTotal, maxIdle, minIdle);
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "JedisSentinelConfig{" +
+                       "masterName='" + masterName + '\'' +
+                       ", connectionTimeout=" + connectionTimeout +
+                       ", soTimeout=" + soTimeout +
+                       ", database=" + database +
+                       ", maxTotal=" + maxTotal +
+                       ", maxIdle=" + maxIdle +
+                       ", minIdle=" + minIdle +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
new file mode 100644
index 0000000..d6621d6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+       private transient JedisCluster jedisCluster;
+
+       /**
+        * Initialize Redis command container for Redis cluster.
+        *
+        * @param jedisCluster JedisCluster instance
+        */
+       public RedisClusterContainer(JedisCluster jedisCluster) {
+               Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not 
be null");
+
+               this.jedisCluster = jedisCluster;
+       }
+
+       @Override
+       public void open() throws Exception {
+
+               // echo() tries to open a connection and echos back the
+               // message passed as argument. Here we use it to monitor
+               // if we can communicate with the cluster.
+
+               jedisCluster.echo("Test");
+       }
+
+       @Override
+       public void hset(final String key, final String hashField, final String 
value) {
+               try {
+                       jedisCluster.hset(key, hashField, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command HSET to hash {} error message {}",
+                                       key, hashField, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void rpush(final String listName, final String value) {
+               try {
+                       jedisCluster.rpush(listName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message: {}",
+                                       listName, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void lpush(String listName, String value) {
+               try {
+                       jedisCluster.lpush(listName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command LPUSH to list {} error message: {}",
+                                       listName, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void sadd(final String setName, final String value) {
+               try {
+                       jedisCluster.sadd(setName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command RPUSH to set {} error message {}",
+                                       setName, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void publish(final String channelName, final String message) {
+               try {
+                       jedisCluster.publish(channelName, message);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command PUBLISH to channel {} error message {}",
+                                       channelName, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void set(final String key, final String value) {
+               try {
+                       jedisCluster.set(key, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command SET to key {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void pfadd(final String key, final String element) {
+               try {
+                       jedisCluster.set(key, element);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command PFADD to key {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void zadd(final String key, final String score, final String 
element) {
+               try {
+                       jedisCluster.zadd(key, Double.valueOf(score), element);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command ZADD to set {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       /**
+        * Closes the {@link JedisCluster}.
+        */
+       @Override
+       public void close() throws IOException {
+               this.jedisCluster.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
new file mode 100644
index 0000000..55dbfc2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis commands.
+ */
+public interface RedisCommandsContainer extends Serializable {
+
+       /**
+        * Open the Jedis container.
+        *
+        * @throws Exception if the instance can not be opened properly
+        */
+       void open() throws Exception;
+
+       /**
+        * Sets field in the hash stored at key to value.
+        * If key does not exist, a new key holding a hash is created.
+        * If field already exists in the hash, it is overwritten.
+        *
+        * @param key Hash name
+        * @param hashField Hash field
+        * @param value Hash value
+        */
+       void hset(String key, String hashField, String value);
+
+       /**
+        * Insert the specified value at the tail of the list stored at key.
+        * If key does not exist, it is created as empty list before performing 
the push operation.
+        *
+        * @param listName Name of the List
+        * @param value  Value to be added
+        */
+       void rpush(String listName, String value);
+
+       /**
+        * Insert the specified value at the head of the list stored at key.
+        * If key does not exist, it is created as empty list before performing 
the push operation.
+        *
+        * @param listName Name of the List
+        * @param value  Value to be added
+        */
+       void lpush(String listName, String value);
+
+       /**
+        * Add the specified member to the set stored at key.
+        * Specified members that are already a member of this set are ignored.
+        * If key does not exist, a new set is created before adding the 
specified members.
+        *
+        * @param setName Name of the Set
+        * @param value Value to be added
+        */
+       void sadd(String setName, String value);
+
+       /**
+        * Posts a message to the given channel.
+        *
+        * @param channelName Name of the channel to which data will be 
published
+        * @param message the message
+        */
+       void publish(String channelName, String message);
+
+       /**
+        * Set key to hold the string value. If key already holds a value, it 
is overwritten,
+        * regardless of its type. Any previous time to live associated with 
the key is
+        * discarded on successful SET operation.
+        *
+        * @param key the key name in which value to be set
+        * @param value the value
+        */
+       void set(String key, String value);
+
+       /**
+        * Adds all the element arguments to the HyperLogLog data structure
+        * stored at the variable name specified as first argument.
+        *
+        * @param key The name of the key
+        * @param element the element
+        */
+       void pfadd(String key, String element);
+
+       /**
+        * Adds the specified member with the specified scores to the sorted 
set stored at key.
+        *
+        * @param key The name of the Sorted Set
+        * @param score Score of the element
+        * @param element  element to be added
+        */
+       void zadd(String key, String score, String element);
+
+       /**
+        * Close the Jedis container.
+        *
+        * @throws IOException if the instance can not be closed properly
+        */
+       void close() throws IOException;
+}

Reply via email to