http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java new file mode 100644 index 0000000..72bac1c --- /dev/null +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.rabbitmq.client.ConnectionFactory; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, + * Boolean, Boolean, Integer, Integer, Integer, Integer)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer, Boolean, + * Boolean, Integer, Integer, Integer, Integer)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private Integer port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private Integer networkRecoveryInterval; + private Boolean automaticRecovery; + private Boolean topologyRecovery; + + private Integer connectionTimeout; + private Integer requestedChannelMax; + private Integer requestedFrameMax; + private Integer requestedHeartbeat; + + /** + * + * @param host host name + * @param port port + * @param virtualHost virtual host + * @param username username + * @param password password + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, Integer port, String virtualHost, String username, String password, + Integer networkRecoveryInterval, Boolean automaticRecovery, + Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax, + Integer requestedFrameMax, Integer requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(port, "port can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** + * + * @param uri the connection URI + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if URI is null + */ + private RMQConnectionConfig(String uri, Integer networkRecoveryInterval, Boolean automaticRecovery, + Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax, + Integer requestedFrameMax, Integer requestedHeartbeat){ + Preconditions.checkNotNull(uri, "Uri can not be null"); + this.uri = uri; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** @return the host to use for connections */ + public String getHost() { + return host; + } + + /** @return the port to use for connections */ + public int getPort() { + return port; + } + + /** + * Retrieve the virtual host. + * @return the virtual host to use when connecting to the broker + */ + public String getVirtualHost() { + return virtualHost; + } + + /** + * Retrieve the user name. + * @return the AMQP user name to use when connecting to the broker + */ + public String getUsername() { + return username; + } + + /** + * Retrieve the password. + * @return the password to use when connecting to the broker + */ + public String getPassword() { + return password; + } + + /** + * Retrieve the URI. + * @return the connection URI when connecting to the broker + */ + public String getUri() { + return uri; + } + + /** + * Returns automatic connection recovery interval in milliseconds. + * @return how long will automatic recovery wait before attempting to reconnect, in ms; default is 5000 + */ + public Integer getNetworkRecoveryInterval() { + return networkRecoveryInterval; + } + + /** + * Returns true if automatic connection recovery is enabled, false otherwise + * @return true if automatic connection recovery is enabled, false otherwise + */ + public Boolean isAutomaticRecovery() { + return automaticRecovery; + } + + /** + * Returns true if topology recovery is enabled, false otherwise + * @return true if topology recovery is enabled, false otherwise + */ + public Boolean isTopologyRecovery() { + return topologyRecovery; + } + + /** + * Retrieve the connection timeout. + * @return the connection timeout, in milliseconds; zero for infinite + */ + public Integer getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Retrieve the requested maximum channel number + * @return the initially requested maximum channel number; zero for unlimited + */ + public Integer getRequestedChannelMax() { + return requestedChannelMax; + } + + /** + * Retrieve the requested maximum frame size + * @return the initially requested maximum frame size, in octets; zero for unlimited + */ + public Integer getRequestedFrameMax() { + return requestedFrameMax; + } + + /** + * Retrieve the requested heartbeat interval. + * @return the initially requested heartbeat interval, in seconds; zero for none + */ + public Integer getRequestedHeartbeat() { + return requestedHeartbeat; + } + + /** + * + * @return Connection Factory for RMQ + * @throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException if Malformed URI has been passed + */ + public ConnectionFactory getConnectionFactory() throws URISyntaxException, + NoSuchAlgorithmException, KeyManagementException { + ConnectionFactory factory = new ConnectionFactory(); + if (this.uri != null && !this.uri.isEmpty()){ + try { + factory.setUri(this.uri); + } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) { + LOG.error("Failed to parse uri", e); + throw e; + } + } else { + factory.setHost(this.host); + factory.setPort(this.port); + factory.setVirtualHost(this.virtualHost); + factory.setUsername(this.username); + factory.setPassword(this.password); + } + + if (this.automaticRecovery != null) { + factory.setAutomaticRecoveryEnabled(this.automaticRecovery); + } + if (this.connectionTimeout != null) { + factory.setConnectionTimeout(this.connectionTimeout); + } + if (this.networkRecoveryInterval != null) { + factory.setNetworkRecoveryInterval(this.networkRecoveryInterval); + } + if (this.requestedHeartbeat != null) { + factory.setRequestedHeartbeat(this.requestedHeartbeat); + } + if (this.topologyRecovery != null) { + factory.setTopologyRecoveryEnabled(this.topologyRecovery); + } + if (this.requestedChannelMax != null) { + factory.setRequestedChannelMax(this.requestedChannelMax); + } + if (this.requestedFrameMax != null) { + factory.setRequestedFrameMax(this.requestedFrameMax); + } + + return factory; + } + + /** + * The Builder Class for {@link RMQConnectionConfig} + */ + public static class Builder { + + private String host; + private Integer port; + private String virtualHost; + private String username; + private String password; + + private Integer networkRecoveryInterval; + private Boolean automaticRecovery; + private Boolean topologyRecovery; + + private Integer connectionTimeout; + private Integer requestedChannelMax; + private Integer requestedFrameMax; + private Integer requestedHeartbeat; + + private String uri; + + /** + * Set the target port. + * @param port the default port to use for connections + * @return the Builder + */ + public Builder setPort(int port) { + this.port = port; + return this; + } + + /** @param host the default host to use for connections + * @return the Builder + */ + public Builder setHost(String host) { + this.host = host; + return this; + } + + /** + * Set the virtual host. + * @param virtualHost the virtual host to use when connecting to the broker + * @return the Builder + */ + public Builder setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + return this; + } + + /** + * Set the user name. + * @param username the AMQP user name to use when connecting to the broker + * @return the Builder + */ + public Builder setUserName(String username) { + this.username = username; + return this; + } + + /** + * Set the password. + * @param password the password to use when connecting to the broker + * @return the Builder + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Convenience method for setting the fields in an AMQP URI: host, + * port, username, password and virtual host. If any part of the + * URI is ommited, the ConnectionFactory's corresponding variable + * is left unchanged. + * @param uri is the AMQP URI containing the data + * @return the Builder + */ + public Builder setUri(String uri) { + this.uri = uri; + return this; + } + + /** + * Enables or disables topology recovery + * @param topologyRecovery if true, enables topology recovery + * @return the Builder + */ + public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) { + this.topologyRecovery = topologyRecovery; + return this; + } + + /** + * Set the requested heartbeat. + * @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero for none + * @return the Builder + */ + public Builder setRequestedHeartbeat(int requestedHeartbeat) { + this.requestedHeartbeat = requestedHeartbeat; + return this; + } + + /** + * Set the requested maximum frame size + * @param requestedFrameMax initially requested maximum frame size, in octets; zero for unlimited + * @return the Builder + */ + public Builder setRequestedFrameMax(int requestedFrameMax) { + this.requestedFrameMax = requestedFrameMax; + return this; + } + + /** + * Set the requested maximum channel number + * @param requestedChannelMax initially requested maximum channel number; zero for unlimited + */ + public Builder setRequestedChannelMax(int requestedChannelMax) { + this.requestedChannelMax = requestedChannelMax; + return this; + } + + /** + * Sets connection recovery interval. Default is 5000. + * @param networkRecoveryInterval how long will automatic recovery wait before attempting to reconnect, in ms + * @return the Builder + */ + public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) { + this.networkRecoveryInterval = networkRecoveryInterval; + return this; + } + + /** + * Set the connection timeout. + * @param connectionTimeout connection establishment timeout in milliseconds; zero for infinite + * @return the Builder + */ + public Builder setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Enables or disables automatic connection recovery + * @param automaticRecovery if true, enables connection recovery + * @return the Builder + */ + public Builder setAutomaticRecovery(boolean automaticRecovery) { + this.automaticRecovery = automaticRecovery; + return this; + } + + /** + * The Builder method + * If URI is NULL we use host, port, vHost, username, password combination + * to initialize connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, + * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)} + * + * else URI will be used to initialize the client connection + * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)} + * @return RMQConnectionConfig + */ + public RMQConnectionConfig build(){ + if(this.uri != null) { + return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval, + this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax, + this.requestedFrameMax, this.requestedHeartbeat); + } else { + return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password, + this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery, + this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java new file mode 100644 index 0000000..b63c835 --- /dev/null +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.QueueingConsumer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.SerializedCheckpointData; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.modules.junit4.PowerMockRunner; +import com.rabbitmq.client.Connection; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + + +/** + * Tests for the RMQSource. The source supports two operation modes. + * 1) Exactly-once (when checkpointed) with RabbitMQ transactions and the deduplication mechanism in + * {@link org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase}. + * 2) At-least-once (when checkpointed) with RabbitMQ transactions but not deduplication. + * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode. + * + * This tests assumes that the message ids are increasing monotonously. That doesn't have to be the + * case. The correlation id is used to uniquely identify messages. + */ +@RunWith(PowerMockRunner.class) +public class RMQSourceTest { + + private RMQSource<String> source; + + private Configuration config = new Configuration(); + + private Thread sourceThread; + + private volatile long messageId; + + private boolean generateCorrelationIds; + + private volatile Exception exception; + + @Before + public void beforeTest() throws Exception { + + source = new RMQTestSource(); + source.open(config); + + messageId = 0; + generateCorrelationIds = true; + + sourceThread = new Thread(new Runnable() { + @Override + public void run() { + try { + source.run(new DummySourceContext()); + } catch (Exception e) { + exception = e; + } + } + }); + } + + @After + public void afterTest() throws Exception { + source.cancel(); + sourceThread.join(); + } + + @Test + public void throwExceptionIfConnectionFactoryReturnNull() throws Exception { + RMQConnectionConfig connectionConfig = Mockito.mock(RMQConnectionConfig.class); + ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class); + Connection connection = Mockito.mock(Connection.class); + Mockito.when(connectionConfig.getConnectionFactory()).thenReturn(connectionFactory); + Mockito.when(connectionFactory.newConnection()).thenReturn(connection); + Mockito.when(connection.createChannel()).thenReturn(null); + + RMQSource<String> rmqSource = new RMQSource<>( + connectionConfig, "queueDummy", true, new StringDeserializationScheme()); + try { + rmqSource.open(new Configuration()); + } catch (RuntimeException ex) { + assertEquals("None of RabbitMQ channels are available", ex.getMessage()); + } + } + + @Test + public void testCheckpointing() throws Exception { + source.autoAck = false; + sourceThread.start(); + + Thread.sleep(5); + + final Random random = new Random(System.currentTimeMillis()); + int numSnapshots = 50; + long previousSnapshotId; + long lastSnapshotId = 0; + + long totalNumberOfAcks = 0; + + for (int i=0; i < numSnapshots; i++) { + long snapshotId = random.nextLong(); + SerializedCheckpointData[] data; + + synchronized (DummySourceContext.lock) { + data = source.snapshotState(snapshotId, System.currentTimeMillis()); + previousSnapshotId = lastSnapshotId; + lastSnapshotId = messageId; + } + // let some time pass + Thread.sleep(5); + + // check if the correct number of messages have been snapshotted + final long numIds = lastSnapshotId - previousSnapshotId; + assertEquals(numIds, data[0].getNumIds()); + // deserialize and check if the last id equals the last snapshotted id + ArrayDeque<Tuple2<Long, List<String>>> deque = SerializedCheckpointData.toDeque(data, new StringSerializer()); + List<String> messageIds = deque.getLast().f1; + if (messageIds.size() > 0) { + assertEquals(lastSnapshotId, (long) Long.valueOf(messageIds.get(messageIds.size() - 1))); + } + + // check if the messages are being acknowledged and the transaction comitted + synchronized (DummySourceContext.lock) { + source.notifyCheckpointComplete(snapshotId); + } + totalNumberOfAcks += numIds; + + } + + Mockito.verify(source.channel, Mockito.times((int) totalNumberOfAcks)).basicAck(Mockito.anyLong(), Mockito.eq(false)); + Mockito.verify(source.channel, Mockito.times(numSnapshots)).txCommit(); + + } + + /** + * Checks whether recurring ids are processed again (they shouldn't be). + */ + @Test + public void testDuplicateId() throws Exception { + source.autoAck = false; + sourceThread.start(); + + while (messageId < 10) { + // wait until messages have been processed + Thread.sleep(5); + } + + long oldMessageId; + synchronized (DummySourceContext.lock) { + oldMessageId = messageId; + messageId = 0; + } + + while (messageId < 10) { + // process again + Thread.sleep(5); + } + + synchronized (DummySourceContext.lock) { + assertEquals(Math.max(messageId, oldMessageId), DummySourceContext.numElementsCollected); + } + } + + + /** + * The source should not acknowledge ids in auto-commit mode or check for previously acknowledged ids + */ + @Test + public void testCheckpointingDisabled() throws Exception { + source.autoAck = true; + sourceThread.start(); + + while (DummySourceContext.numElementsCollected < 50) { + // wait until messages have been processed + Thread.sleep(5); + } + + // see addId in RMQTestSource.addId for the assert + } + + /** + * Tests error reporting in case of invalid correlation ids + */ + @Test + public void testCorrelationIdNotSet() throws InterruptedException { + generateCorrelationIds = false; + source.autoAck = false; + sourceThread.start(); + + sourceThread.join(); + + assertNotNull(exception); + assertTrue(exception instanceof NullPointerException); + } + + /** + * Tests whether constructor params are passed correctly. + */ + @Test + public void testConstructorParams() throws Exception { + // verify construction params + RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder(); + builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/"); + ConstructorTestClass testObj = new ConstructorTestClass( + builder.build(), "queueTest", false, new StringDeserializationScheme()); + + try { + testObj.open(new Configuration()); + } catch (Exception e) { + // connection fails but check if args have been passed correctly + } + + assertEquals("hostTest", testObj.getFactory().getHost()); + assertEquals(999, testObj.getFactory().getPort()); + assertEquals("userTest", testObj.getFactory().getUsername()); + assertEquals("passTest", testObj.getFactory().getPassword()); + } + + private static class ConstructorTestClass extends RMQSource<String> { + + private ConnectionFactory factory; + + public ConstructorTestClass(RMQConnectionConfig rmqConnectionConfig, + String queueName, + boolean usesCorrelationId, + DeserializationSchema<String> deserializationSchema) throws Exception { + super(rmqConnectionConfig, queueName, usesCorrelationId, deserializationSchema); + RMQConnectionConfig.Builder builder = new RMQConnectionConfig.Builder(); + builder.setHost("hostTest").setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/"); + factory = Mockito.spy(builder.build().getConnectionFactory()); + try { + Mockito.doThrow(new RuntimeException()).when(factory).newConnection(); + } catch (IOException e) { + fail("Failed to stub connection method"); + } + } + + @Override + protected ConnectionFactory setupConnectionFactory() { + return factory; + } + + public ConnectionFactory getFactory() { + return factory; + } + } + + private static class StringDeserializationScheme implements DeserializationSchema<String> { + + @Override + public String deserialize(byte[] message) throws IOException { + try { + // wait a bit to not cause too much cpu load + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new String(message); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return false; + } + + @Override + public TypeInformation<String> getProducedType() { + return TypeExtractor.getForClass(String.class); + } + } + + private class RMQTestSource extends RMQSource<String> { + + public RMQTestSource() { + super(new RMQConnectionConfig.Builder().setHost("hostTest") + .setPort(999).setUserName("userTest").setPassword("passTest").setVirtualHost("/").build() + , "queueDummy", true, new StringDeserializationScheme()); + } + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + + consumer = Mockito.mock(QueueingConsumer.class); + + // Mock for delivery + final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class); + Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes()); + + try { + Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock); + } catch (InterruptedException e) { + fail("Couldn't setup up deliveryMock"); + } + + // Mock for envelope + Envelope envelope = Mockito.mock(Envelope.class); + Mockito.when(deliveryMock.getEnvelope()).thenReturn(envelope); + + Mockito.when(envelope.getDeliveryTag()).thenAnswer(new Answer<Long>() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + return ++messageId; + } + }); + + // Mock for properties + AMQP.BasicProperties props = Mockito.mock(AMQP.BasicProperties.class); + Mockito.when(deliveryMock.getProperties()).thenReturn(props); + + Mockito.when(props.getCorrelationId()).thenAnswer(new Answer<String>() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return generateCorrelationIds ? "" + messageId : null; + } + }); + + } + + @Override + protected ConnectionFactory setupConnectionFactory() { + ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class); + Connection connection = Mockito.mock(Connection.class); + try { + Mockito.when(connectionFactory.newConnection()).thenReturn(connection); + Mockito.when(connection.createChannel()).thenReturn(Mockito.mock(Channel.class)); + } catch (IOException e) { + fail("Test environment couldn't be created."); + } + return connectionFactory; + } + + @Override + public RuntimeContext getRuntimeContext() { + return Mockito.mock(StreamingRuntimeContext.class); + } + + @Override + protected boolean addId(String uid) { + assertEquals(false, autoAck); + return super.addId(uid); + } + } + + private static class DummySourceContext implements SourceFunction.SourceContext<String> { + + private static final Object lock = new Object(); + + private static long numElementsCollected; + + public DummySourceContext() { + numElementsCollected = 0; + } + + @Override + public void collect(String element) { + numElementsCollected++; + } + + @Override + public void collectWithTimestamp(java.lang.String element, long timestamp) { + } + + @Override + public void emitWatermark(Watermark mark) { + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() { + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java new file mode 100644 index 0000000..40985ce --- /dev/null +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfigTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.rabbitmq.client.ConnectionFactory; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +import static org.junit.Assert.assertEquals; + + +public class RMQConnectionConfigTest { + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfHostIsNull() throws NoSuchAlgorithmException, + KeyManagementException, URISyntaxException { + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setPort(1000).setUserName("guest") + .setPassword("guest").setVirtualHost("/").build(); + connectionConfig.getConnectionFactory(); + } + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfPortIsNull() throws NoSuchAlgorithmException, + KeyManagementException, URISyntaxException { + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost").setUserName("guest") + .setPassword("guest").setVirtualHost("/").build(); + connectionConfig.getConnectionFactory(); + } + + @Test(expected = NullPointerException.class) + public void shouldSetDefaultValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException, + KeyManagementException, URISyntaxException { + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost").setUserName("guest") + .setPassword("guest").setVirtualHost("/").build(); + ConnectionFactory factory = connectionConfig.getConnectionFactory(); + assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, factory.getConnectionTimeout()); + } + + @Test + public void shouldSetProvidedValueIfConnectionTimeoutNotGiven() throws NoSuchAlgorithmException, + KeyManagementException, URISyntaxException { + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost").setPort(5000).setUserName("guest") + .setPassword("guest").setVirtualHost("/") + .setConnectionTimeout(5000).build(); + ConnectionFactory factory = connectionConfig.getConnectionFactory(); + assertEquals(5000, factory.getConnectionTimeout()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java new file mode 100644 index 0000000..199cd1e --- /dev/null +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +public class RMQSinkTest { + + private static final String QUEUE_NAME = "queue"; + private static final String MESSAGE_STR = "msg"; + private static final byte[] MESSAGE = new byte[1]; + + private RMQConnectionConfig rmqConnectionConfig; + private ConnectionFactory connectionFactory; + private Connection connection; + private Channel channel; + private SerializationSchema<String> serializationSchema; + + + @Before + public void before() throws Exception { + serializationSchema = spy(new DummySerializationSchema()); + rmqConnectionConfig = mock(RMQConnectionConfig.class); + connectionFactory = mock(ConnectionFactory.class); + connection = mock(Connection.class); + channel = mock(Channel.class); + + when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory); + when(connectionFactory.newConnection()).thenReturn(connection); + when(connection.createChannel()).thenReturn(channel); + } + + @Test + public void openCallDeclaresQueue() throws Exception { + createRMQSink(); + + verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null); + } + + @Test + public void throwExceptionIfChannelIsNull() throws Exception { + when(connection.createChannel()).thenReturn(null); + try { + createRMQSink(); + } catch (RuntimeException ex) { + assertEquals("None of RabbitMQ channels are available", ex.getMessage()); + } + } + + private RMQSink<String> createRMQSink() throws Exception { + RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema); + rmqSink.open(new Configuration()); + return rmqSink; + } + + @Test + public void invokePublishBytesToQueue() throws Exception { + RMQSink<String> rmqSink = createRMQSink(); + + rmqSink.invoke(MESSAGE_STR); + verify(serializationSchema).serialize(MESSAGE_STR); + verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE); + } + + @Test(expected = RuntimeException.class) + public void exceptionDuringPublishingIsNotIgnored() throws Exception { + RMQSink<String> rmqSink = createRMQSink(); + + doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE); + rmqSink.invoke("msg"); + } + + @Test + public void exceptionDuringPublishingIsIgnoredIfLogFailuresOnly() throws Exception { + RMQSink<String> rmqSink = createRMQSink(); + rmqSink.setLogFailuresOnly(true); + + doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE); + rmqSink.invoke("msg"); + } + + @Test + public void closeAllResources() throws Exception { + RMQSink<String> rmqSink = createRMQSink(); + + rmqSink.close(); + + verify(channel).close(); + verify(connection).close(); + } + + private class DummySerializationSchema implements SerializationSchema<String> { + @Override + public byte[] serialize(String element) { + return MESSAGE; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/pom.xml b/flink-connectors/flink-connector-redis/pom.xml new file mode 100644 index 0000000..a348f31 --- /dev/null +++ b/flink-connectors/flink-connector-redis/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-redis_2.10</artifactId> + <name>flink-connector-redis</name> + + <packaging>jar</packaging> + + <properties> + <jedis.version>2.8.0</jedis.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + <version>${jedis.version}</version> + </dependency> + + <dependency> + <groupId>com.github.kstyrc</groupId> + <artifactId>embedded-redis</artifactId> + <version>0.6</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java new file mode 100644 index 0000000..f6b0fd7 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}. + * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument, + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when + * you want to connect to a single Redis server. + * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel. + * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to + * a Redis Cluster. + * + * <p>Example: + * + * <pre> + *{@code + *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> { + * + * private RedisCommand redisCommand; + * + * public RedisExampleMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisCommandDescription getCommandDescription() { + * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2<String, String> data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2<String, String> data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH)); + *}</pre> + * + * @param <IN> Type of the elements emitted by this sink + */ +public class RedisSink<IN> extends RichSinkFunction<IN> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** + * This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. + * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. + * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. + * {@code additionalKey} used as hash name for {@link RedisDataType#HASH} + * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score. + * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET} + */ + private String additionalKey; + private RedisMapper<IN> redisSinkMapper; + private RedisCommand redisCommand; + + private FlinkJedisConfigBase flinkJedisConfigBase; + private RedisCommandsContainer redisCommandsContainer; + + /** + * Creates a new {@link RedisSink} that connects to the Redis server. + * + * @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase} + * @param redisSinkMapper This is used to generate Redis command and key value from incoming elements. + */ + public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) { + Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null"); + Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null"); + Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null"); + + this.flinkJedisConfigBase = flinkJedisConfigBase; + + this.redisSinkMapper = redisSinkMapper; + RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); + this.redisCommand = redisCommandDescription.getCommand(); + this.additionalKey = redisCommandDescription.getAdditionalKey(); + } + + /** + * Called when new data arrives to the sink, and forwards it to Redis channel. + * Depending on the specified Redis data type (see {@link RedisDataType}), + * a different Redis command will be applied. + * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD. + * + * @param input The incoming data + */ + @Override + public void invoke(IN input) throws Exception { + String key = redisSinkMapper.getKeyFromData(input); + String value = redisSinkMapper.getValueFromData(input); + + switch (redisCommand) { + case RPUSH: + this.redisCommandsContainer.rpush(key, value); + break; + case LPUSH: + this.redisCommandsContainer.lpush(key, value); + break; + case SADD: + this.redisCommandsContainer.sadd(key, value); + break; + case SET: + this.redisCommandsContainer.set(key, value); + break; + case PFADD: + this.redisCommandsContainer.pfadd(key, value); + break; + case PUBLISH: + this.redisCommandsContainer.publish(key, value); + break; + case ZADD: + this.redisCommandsContainer.zadd(this.additionalKey, value, key); + break; + case HSET: + this.redisCommandsContainer.hset(this.additionalKey, key, value); + break; + default: + throw new IllegalArgumentException("Cannot process such data type: " + redisCommand); + } + } + + /** + * Initializes the connection to Redis by either cluster or sentinels or single server. + * + * @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null + */ + @Override + public void open(Configuration parameters) throws Exception { + try { + this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); + this.redisCommandsContainer.open(); + } catch (Exception e) { + LOG.error("Redis has not been properly initialized: ", e); + throw e; + } + } + + /** + * Closes commands container. + * @throws IOException if command container is unable to close. + */ + @Override + public void close() throws IOException { + if (redisCommandsContainer != null) { + redisCommandsContainer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java new file mode 100644 index 0000000..6e6cfe5 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private final Set<InetSocketAddress> nodes; + private final int maxRedirections; + + + /** + * Jedis cluster configuration. + * The list of node is mandatory, and when nodes is not set, it throws NullPointerException. + * + * @param nodes list of node information for JedisCluster + * @param connectionTimeout socket / connection timeout. The default is 2000 + * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK + * @param maxTotal the maximum number of objects that can be allocated by the pool + * @param maxIdle the cap on the number of "idle" instances in the pool + * @param minIdle the minimum number of idle objects to maintain in the pool + * @throws NullPointerException if parameter {@code nodes} is {@code null} + */ + private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + + Preconditions.checkNotNull(nodes, "Node information should be presented"); + Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); + this.nodes = new HashSet<>(nodes); + this.maxRedirections = maxRedirections; + } + + + + /** + * Returns nodes. + * + * @return list of node information + */ + public Set<HostAndPort> getNodes() { + Set<HostAndPort> ret = new HashSet<>(); + for (InetSocketAddress node : nodes) { + ret.add(new HostAndPort(node.getHostName(), node.getPort())); + } + return ret; + } + + /** + * Returns limit of redirection. + * + * @return limit of redirection + */ + public int getMaxRedirections() { + return maxRedirections; + } + + + /** + * Builder for initializing {@link FlinkJedisClusterConfig}. + */ + public static class Builder { + private Set<InetSocketAddress> nodes; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int maxRedirections = 5; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets list of node. + * + * @param nodes list of node + * @return Builder itself + */ + public Builder setNodes(Set<InetSocketAddress> nodes) { + this.nodes = nodes; + return this; + } + + /** + * Sets socket / connection timeout. + * + * @param timeout socket / connection timeout, default value is 2000 + * @return Builder itself + */ + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * Sets limit of redirection. + * + * @param maxRedirections limit of redirection, default value is 5 + * @return Builder itself + */ + public Builder setMaxRedirections(int maxRedirections) { + this.maxRedirections = maxRedirections; + return this; + } + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Builds JedisClusterConfig. + * + * @return JedisClusterConfig + */ + public FlinkJedisClusterConfig build() { + return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisClusterConfig{" + + "nodes=" + nodes + + ", timeout=" + connectionTimeout + + ", maxRedirections=" + maxRedirections + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java new file mode 100644 index 0000000..a2489b8 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * Base class for Flink Redis configuration. + */ +public abstract class FlinkJedisConfigBase implements Serializable { + private static final long serialVersionUID = 1L; + + protected final int maxTotal; + protected final int maxIdle; + protected final int minIdle; + protected final int connectionTimeout; + + protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){ + Preconditions.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative"); + Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can not be negative"); + Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can not be negative"); + Preconditions.checkArgument(minIdle >= 0, "minIdle value can not be negative"); + this.connectionTimeout = connectionTimeout; + this.maxTotal = maxTotal; + this.maxIdle = maxIdle; + this.minIdle = minIdle; + } + + /** + * Returns timeout. + * + * @return connection timeout + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Get the value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code maxTotal} for this + * configuration instance + * @see GenericObjectPoolConfig#getMaxTotal() + */ + public int getMaxTotal() { + return maxTotal; + } + + /** + * Get the value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code maxIdle} for this + * configuration instance + * @see GenericObjectPoolConfig#getMaxIdle() + */ + public int getMaxIdle() { + return maxIdle; + } + + /** + * Get the value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @return The current setting of {@code minIdle} for this + * configuration instance + * @see GenericObjectPoolConfig#getMinIdle() + */ + public int getMinIdle() { + return minIdle; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java new file mode 100644 index 0000000..d261a35 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.Protocol; + +/** + * Configuration for Jedis pool. + */ +public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { + + private static final long serialVersionUID = 1L; + + private final String host; + private final int port; + private final int database; + private final String password; + + + /** + * Jedis pool configuration. + * The host is mandatory, and when host is not set, it throws NullPointerException. + * + * @param host hostname or IP + * @param port port, default value is 6379 + * @param connectionTimeout socket / connection timeout, default value is 2000 milli second + * @param password password, if any + * @param database database index + * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @throws NullPointerException if parameter {@code host} is {@code null} + */ + private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + Preconditions.checkNotNull(host, "Host information should be presented"); + this.host = host; + this.port = port; + this.database = database; + this.password = password; + } + + /** + * Returns host. + * + * @return hostname or IP + */ + public String getHost() { + return host; + } + + /** + * Returns port. + * + * @return port + */ + public int getPort() { + return port; + } + + + /** + * Returns database index. + * + * @return database index + */ + public int getDatabase() { + return database; + } + + /** + * Returns password. + * + * @return password + */ + public String getPassword() { + return password; + } + + /** + * Builder for initializing {@link FlinkJedisPoolConfig}. + */ + public static class Builder { + private String host; + private int port = Protocol.DEFAULT_PORT; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int database = Protocol.DEFAULT_DATABASE; + private String password; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Sets host. + * + * @param host host + * @return Builder itself + */ + public Builder setHost(String host) { + this.host = host; + return this; + } + + /** + * Sets port. + * + * @param port port, default value is 6379 + * @return Builder itself + */ + public Builder setPort(int port) { + this.port = port; + return this; + } + + /** + * Sets timeout. + * + * @param timeout timeout, default value is 2000 + * @return Builder itself + */ + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * Sets database index. + * + * @param database database index, default value is 0 + * @return Builder itself + */ + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + /** + * Sets password. + * + * @param password password, if any + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + + /** + * Builds JedisPoolConfig. + * + * @return JedisPoolConfig + */ + public FlinkJedisPoolConfig build() { + return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisPoolConfig{" + + "host='" + host + '\'' + + ", port=" + port + + ", timeout=" + connectionTimeout + + ", database=" + database + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java new file mode 100644 index 0000000..2cdb397 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Sentinel pool. + */ +public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class); + + private final String masterName; + private final Set<String> sentinels; + private final int soTimeout; + private final String password; + private final int database; + + /** + * Jedis Sentinels config. + * The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException. + * + * @param masterName master name of the replica set + * @param sentinels set of sentinel hosts + * @param connectionTimeout timeout connection timeout + * @param soTimeout timeout socket timeout + * @param password password, if any + * @param database database database index + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool + * @param maxIdle the cap on the number of "idle" instances in the pool + * @param minIdle the minimum number of idle objects to maintain in the pool + * + * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null} + * @throws IllegalArgumentException if {@code sentinels} are empty + */ + private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels, + int connectionTimeout, int soTimeout, + String password, int database, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + Preconditions.checkNotNull(masterName, "Master name should be presented"); + Preconditions.checkNotNull(sentinels, "Sentinels information should be presented"); + Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty"); + + this.masterName = masterName; + this.sentinels = new HashSet<>(sentinels); + this.soTimeout = soTimeout; + this.password = password; + this.database = database; + } + + /** + * Returns master name of the replica set. + * + * @return master name of the replica set. + */ + public String getMasterName() { + return masterName; + } + + /** + * Returns Sentinels host addresses. + * + * @return Set of Sentinels host addresses + */ + public Set<String> getSentinels() { + return sentinels; + } + + /** + * Returns socket timeout. + * + * @return socket timeout + */ + public int getSoTimeout() { + return soTimeout; + } + + /** + * Returns password. + * + * @return password + */ + public String getPassword() { + return password; + } + + /** + * Returns database index. + * + * @return database index + */ + public int getDatabase() { + return database; + } + + /** + * Builder for initializing {@link FlinkJedisSentinelConfig}. + */ + public static class Builder { + private String masterName; + private Set<String> sentinels; + private int connectionTimeout = Protocol.DEFAULT_TIMEOUT; + private int soTimeout = Protocol.DEFAULT_TIMEOUT; + private String password; + private int database = Protocol.DEFAULT_DATABASE; + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + + /** + * Sets master name of the replica set. + * + * @param masterName master name of the replica set + * @return Builder itself + */ + public Builder setMasterName(String masterName) { + this.masterName = masterName; + return this; + } + + /** + * Sets sentinels address. + * + * @param sentinels host set of the sentinels + * @return Builder itself + */ + public Builder setSentinels(Set<String> sentinels) { + this.sentinels = sentinels; + return this; + } + + /** + * Sets connection timeout. + * + * @param connectionTimeout connection timeout, default value is 2000 + * @return Builder itself + */ + public Builder setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Sets socket timeout. + * + * @param soTimeout socket timeout, default value is 2000 + * @return Builder itself + */ + public Builder setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + return this; + } + + /** + * Sets password. + * + * @param password password, if any + * @return Builder itself + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Sets database index. + * + * @param database database index, default value is 0 + * @return Builder itself + */ + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + /** + * Sets value for the {@code maxTotal} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + return this; + } + + /** + * Sets value for the {@code maxIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 + * @return Builder itself + */ + public Builder setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + return this; + } + + /** + * Sets value for the {@code minIdle} configuration attribute + * for pools to be created with this configuration instance. + * + * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0 + * @return Builder itself + */ + public Builder setMinIdle(int minIdle) { + this.minIdle = minIdle; + return this; + } + + /** + * Builds JedisSentinelConfig. + * + * @return JedisSentinelConfig + */ + public FlinkJedisSentinelConfig build(){ + return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout, + password, database, maxTotal, maxIdle, minIdle); + } + } + + @Override + public String toString() { + return "JedisSentinelConfig{" + + "masterName='" + masterName + '\'' + + ", connectionTimeout=" + connectionTimeout + + ", soTimeout=" + soTimeout + + ", database=" + database + + ", maxTotal=" + maxTotal + + ", maxIdle=" + maxIdle + + ", minIdle=" + minIdle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java new file mode 100644 index 0000000..d6621d6 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); + + private transient JedisCluster jedisCluster; + + /** + * Initialize Redis command container for Redis cluster. + * + * @param jedisCluster JedisCluster instance + */ + public RedisClusterContainer(JedisCluster jedisCluster) { + Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not be null"); + + this.jedisCluster = jedisCluster; + } + + @Override + public void open() throws Exception { + + // echo() tries to open a connection and echos back the + // message passed as argument. Here we use it to monitor + // if we can communicate with the cluster. + + jedisCluster.echo("Test"); + } + + @Override + public void hset(final String key, final String hashField, final String value) { + try { + jedisCluster.hset(key, hashField, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to hash {} error message {}", + key, hashField, e.getMessage()); + } + throw e; + } + } + + @Override + public void rpush(final String listName, final String value) { + try { + jedisCluster.rpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}", + listName, e.getMessage()); + } + throw e; + } + } + + @Override + public void lpush(String listName, String value) { + try { + jedisCluster.lpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command LPUSH to list {} error message: {}", + listName, e.getMessage()); + } + throw e; + } + } + + @Override + public void sadd(final String setName, final String value) { + try { + jedisCluster.sadd(setName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}", + setName, e.getMessage()); + } + throw e; + } + } + + @Override + public void publish(final String channelName, final String message) { + try { + jedisCluster.publish(channelName, message); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}", + channelName, e.getMessage()); + } + throw e; + } + } + + @Override + public void set(final String key, final String value) { + try { + jedisCluster.set(key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command SET to key {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + @Override + public void pfadd(final String key, final String element) { + try { + jedisCluster.set(key, element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PFADD to key {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + @Override + public void zadd(final String key, final String score, final String element) { + try { + jedisCluster.zadd(key, Double.valueOf(score), element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command ZADD to set {} error message {}", + key, e.getMessage()); + } + throw e; + } + } + + /** + * Closes the {@link JedisCluster}. + */ + @Override + public void close() throws IOException { + this.jedisCluster.close(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java new file mode 100644 index 0000000..55dbfc2 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The container for all available Redis commands. + */ +public interface RedisCommandsContainer extends Serializable { + + /** + * Open the Jedis container. + * + * @throws Exception if the instance can not be opened properly + */ + void open() throws Exception; + + /** + * Sets field in the hash stored at key to value. + * If key does not exist, a new key holding a hash is created. + * If field already exists in the hash, it is overwritten. + * + * @param key Hash name + * @param hashField Hash field + * @param value Hash value + */ + void hset(String key, String hashField, String value); + + /** + * Insert the specified value at the tail of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + * + * @param listName Name of the List + * @param value Value to be added + */ + void rpush(String listName, String value); + + /** + * Insert the specified value at the head of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + * + * @param listName Name of the List + * @param value Value to be added + */ + void lpush(String listName, String value); + + /** + * Add the specified member to the set stored at key. + * Specified members that are already a member of this set are ignored. + * If key does not exist, a new set is created before adding the specified members. + * + * @param setName Name of the Set + * @param value Value to be added + */ + void sadd(String setName, String value); + + /** + * Posts a message to the given channel. + * + * @param channelName Name of the channel to which data will be published + * @param message the message + */ + void publish(String channelName, String message); + + /** + * Set key to hold the string value. If key already holds a value, it is overwritten, + * regardless of its type. Any previous time to live associated with the key is + * discarded on successful SET operation. + * + * @param key the key name in which value to be set + * @param value the value + */ + void set(String key, String value); + + /** + * Adds all the element arguments to the HyperLogLog data structure + * stored at the variable name specified as first argument. + * + * @param key The name of the key + * @param element the element + */ + void pfadd(String key, String element); + + /** + * Adds the specified member with the specified scores to the sorted set stored at key. + * + * @param key The name of the Sorted Set + * @param score Score of the element + * @param element element to be added + */ + void zadd(String key, String score, String element); + + /** + * Close the Jedis container. + * + * @throws IOException if the instance can not be closed properly + */ + void close() throws IOException; +}
