http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java new file mode 100644 index 0000000..dc5396a --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +/** + * The builder for {@link RedisCommandsContainer}. + */ +public class RedisCommandsContainerBuilder { + + /** + * Initialize the {@link RedisCommandsContainer} based on the instance type. + * @param flinkJedisConfigBase configuration base + * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null + */ + public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){ + if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){ + FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig); + } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) { + FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig); + } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) { + FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase; + return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig); + } else { + throw new IllegalArgumentException("Jedis configuration not found"); + } + } + + /** + * Builds container for single Redis environment. + * + * @param jedisPoolConfig configuration for JedisPool + * @return container for single Redis environment + * @throws NullPointerException if jedisPoolConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { + Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); + + JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + return new RedisContainer(jedisPool); + } + + /** + * Builds container for Redis Cluster environment. + * + * @param jedisClusterConfig configuration for JedisCluster + * @return container for Redis Cluster environment + * @throws NullPointerException if jedisClusterConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) { + Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle()); + + JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(), + jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig); + return new RedisClusterContainer(jedisCluster); + } + + /** + * Builds container for Redis Sentinel environment. + * + * @param jedisSentinelConfig configuration for JedisSentinel + * @return container for Redis sentinel environment + * @throws NullPointerException if jedisSentinelConfig is null + */ + public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { + Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); + + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle()); + + JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), + jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, + jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(), + jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase()); + return new RedisContainer(jedisSentinelPool); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java new file mode 100644 index 0000000..ba4bbda --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private transient JedisPool jedisPool; + private transient JedisSentinelPool jedisSentinelPool; + + /** + * Use this constructor if to connect with single Redis server. + * + * @param jedisPool JedisPool which actually manages Jedis instances + */ + public RedisContainer(JedisPool jedisPool) { + Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + this.jedisSentinelPool = null; + } + + /** + * Use this constructor if Redis environment is clustered with sentinels. + * + * @param sentinelPool SentinelPool which actually manages Jedis instances + */ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisPool = null; + this.jedisSentinelPool = sentinelPool; + } + + /** + * Closes the Jedis instances. + */ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void open() throws Exception { + + // echo() tries to open a connection and echos back the + // message passed as argument. Here we use it to monitor + // if we can communicate with the cluster. + + getInstance().echo("Test"); + } + + @Override + public void hset(final String key, final String hashField, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(key, hashField, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}", + key, hashField, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void rpush(final String listName, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.rpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}", + listName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void lpush(String listName, String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.lpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command LUSH to list {} error message {}", + listName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void sadd(final String setName, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.sadd(setName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}", + setName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void publish(final String channelName, final String message) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.publish(channelName, message); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}", + channelName, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void set(final String key, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.set(key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command SET to key {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void pfadd(final String key, final String element) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.pfadd(key, element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command PFADD to key {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void zadd(final String key, final String score, final String element) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.zadd(key, Double.valueOf(score), element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command ZADD to set {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + /** + * Returns Jedis instance from the pool. + * + * @return the Jedis instance + */ + private Jedis getInstance() { + if (jedisSentinelPool != null) { + return jedisSentinelPool.getResource(); + } else { + return jedisPool.getResource(); + } + } + + /** + * Closes the jedis instance after finishing the command. + * + * @param jedis The jedis instance + */ + private void releaseInstance(final Jedis jedis) { + if (jedis == null) { + return; + } + try { + jedis.close(); + } catch (Exception e) { + LOG.error("Failed to close (return) instance to pool", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java new file mode 100644 index 0000000..b0661c7 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available commands for Redis. Each command belongs to a {@link RedisDataType} group. + */ +public enum RedisCommand { + + /** + * Insert the specified value at the head of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operations. + */ + LPUSH(RedisDataType.LIST), + + /** + * Insert the specified value at the tail of the list stored at key. + * If key does not exist, it is created as empty list before performing the push operation. + */ + RPUSH(RedisDataType.LIST), + + /** + * Add the specified member to the set stored at key. + * Specified member that is already a member of this set is ignored. + */ + SADD(RedisDataType.SET), + + /** + * Set key to hold the string value. If key already holds a value, + * it is overwritten, regardless of its type. + */ + SET(RedisDataType.STRING), + + /** + * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument. + */ + PFADD(RedisDataType.HYPER_LOG_LOG), + + /** + * Posts a message to the given channel. + */ + PUBLISH(RedisDataType.PUBSUB), + + /** + * Adds the specified members with the specified score to the sorted set stored at key. + */ + ZADD(RedisDataType.SORTED_SET), + + /** + * Sets field in the hash stored at key to value. If key does not exist, + * a new key holding a hash is created. If field already exists in the hash, it is overwritten. + */ + HSET(RedisDataType.HASH); + + /** + * The {@link RedisDataType} this command belongs to. + */ + private RedisDataType redisDataType; + + RedisCommand(RedisDataType redisDataType) { + this.redisDataType = redisDataType; + } + + + /** + * The {@link RedisDataType} this command belongs to. + * @return the {@link RedisDataType} + */ + public RedisDataType getRedisDataType(){ + return redisDataType; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java new file mode 100644 index 0000000..1eea48a --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * The description of the command type. This must be passed while creating new {@link RedisMapper}. + * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}, + * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}. + * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException} + * + * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} + * you can use second constructor {@link #RedisCommandDescription(RedisCommand)} + */ +public class RedisCommandDescription implements Serializable { + + private static final long serialVersionUID = 1L; + + private RedisCommand redisCommand; + + /** + * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. + * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. + * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element. + * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH} + * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score. + * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET} + */ + private String additionalKey; + + /** + * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}. + * If different data type is specified, {@code additionalKey} is ignored. + * @param redisCommand the redis command type {@link RedisCommand} + * @param additionalKey additional key for Hash and Sorted set data type + */ + public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) { + Preconditions.checkNotNull(redisCommand, "Redis command type can not be null"); + this.redisCommand = redisCommand; + this.additionalKey = additionalKey; + + if (redisCommand.getRedisDataType() == RedisDataType.HASH || + redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) { + if (additionalKey == null) { + throw new IllegalArgumentException("Hash and Sorted Set should have additional key"); + } + } + } + + /** + * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}. + * + * @param redisCommand the redis data type {@link RedisCommand} + */ + public RedisCommandDescription(RedisCommand redisCommand) { + this(redisCommand, null); + } + + /** + * Returns the {@link RedisCommand}. + * + * @return the command type of the mapping + */ + public RedisCommand getCommand() { + return redisCommand; + } + + /** + * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. + * + * @return the additional key + */ + public String getAdditionalKey() { + return additionalKey; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java new file mode 100644 index 0000000..6e3997c --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available data type for Redis. + */ +public enum RedisDataType { + + /** + * Strings are the most basic kind of Redis value. Redis Strings are binary safe, + * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object. + * A String value can be at max 512 Megabytes in length. + */ + STRING, + + /** + * Redis Hashes are maps between string fields and string values. + */ + HASH, + + /** + * Redis Lists are simply lists of strings, sorted by insertion order. + */ + LIST, + + /** + * Redis Sets are an unordered collection of Strings. + */ + SET, + + /** + * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings. + * The difference is that every member of a Sorted Set is associated with score, + * that is used in order to take the sorted set ordered, from the smallest to the greatest score. + * While members are unique, scores may be repeated. + */ + SORTED_SET, + + /** + * HyperLogLog is a probabilistic data structure used in order to count unique things. + */ + HYPER_LOG_LOG, + + /** + * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels, + * without knowledge of what (if any) subscribers there may be. + * Subscribers express interest in one or more channels, and only receive messages + * that are of interest, without knowledge of what (if any) publishers there are. + */ + PUBSUB +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java new file mode 100644 index 0000000..63fed19 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * Function that creates the description how the input data should be mapped to redis type. + *<p>Example: + *<pre>{@code + *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> { + * public RedisDataTypeDescription getCommandDescription() { + * return new RedisDataTypeDescription(RedisCommand.PUBLISH); + * } + * public String getKeyFromData(Tuple2<String, String> data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2<String, String> data) { + * return data.f1; + * } + *} + *}</pre> + * + * @param <T> The type of the element handled by this {@code RedisMapper} + */ +public interface RedisMapper<T> extends Function, Serializable { + + /** + * Returns descriptor which defines data type. + * + * @return data type descriptor + */ + RedisCommandDescription getCommandDescription(); + + /** + * Extracts key from data. + * + * @param data source data + * @return key + */ + String getKeyFromData(T data); + + /** + * Extracts value from data. + * + * @param data source data + * @return value + */ + String getValueFromData(T data); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java new file mode 100644 index 0000000..7d98f2d --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import redis.embedded.RedisServer; + +import java.io.IOException; + +import static org.apache.flink.util.NetUtils.getAvailablePort; + +public abstract class RedisITCaseBase extends StreamingMultipleProgramsTestBase { + + public static final int REDIS_PORT = getAvailablePort(); + public static final String REDIS_HOST = "127.0.0.1"; + + private static RedisServer redisServer; + + @BeforeClass + public static void createRedisServer() throws IOException, InterruptedException { + redisServer = new RedisServer(REDIS_PORT); + redisServer.start(); + } + + @AfterClass + public static void stopRedisServer(){ + redisServer.stop(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java new file mode 100644 index 0000000..dc59ba4 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisSentinelPool; +import redis.embedded.RedisCluster; +import redis.embedded.util.JedisUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.apache.flink.util.NetUtils.getAvailablePort; + +public class RedisSentinelClusterTest extends TestLogger { + + private static RedisCluster cluster; + private static final String REDIS_MASTER = "master"; + private static final String TEST_KEY = "testKey"; + private static final String TEST_VALUE = "testValue"; + private static final List<Integer> sentinels = Arrays.asList(getAvailablePort(), getAvailablePort()); + private static final List<Integer> group1 = Arrays.asList(getAvailablePort(), getAvailablePort()); + + private JedisSentinelPool jedisSentinelPool; + private FlinkJedisSentinelConfig jedisSentinelConfig; + + @BeforeClass + public static void setUpCluster(){ + cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1) + .serverPorts(group1).replicationGroup(REDIS_MASTER, 1) + .build(); + cluster.start(); + } + + @Before + public void setUp() { + Set<String> hosts = JedisUtil.sentinelHosts(cluster); + jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER) + .setSentinels(hosts).build(); + jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), + jedisSentinelConfig.getSentinels()); + } + + @Test + public void testRedisSentinelOperation() { + RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig); + Jedis jedis = null; + try{ + jedis = jedisSentinelPool.getResource(); + redisContainer.set(TEST_KEY, TEST_VALUE); + assertEquals(TEST_VALUE, jedis.get(TEST_KEY)); + }finally { + if (jedis != null){ + jedis.close(); + } + } + } + + @After + public void tearDown() throws IOException { + if (jedisSentinelPool != null) { + jedisSentinelPool.close(); + } + } + + @AfterClass + public static void tearDownCluster() throws IOException { + if (!cluster.isActive()) { + cluster.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java new file mode 100644 index 0000000..21f3cca --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import static org.junit.Assert.assertEquals; + +public class RedisSinkITCase extends RedisITCaseBase { + + private FlinkJedisPoolConfig jedisPoolConfig; + private static final Long NUM_ELEMENTS = 20L; + private static final String REDIS_KEY = "TEST_KEY"; + private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; + + StreamExecutionEnvironment env; + + + private Jedis jedis; + + @Before + public void setUp(){ + jedisPoolConfig = new FlinkJedisPoolConfig.Builder() + .setHost(REDIS_HOST) + .setPort(REDIS_PORT).build(); + jedis = new Jedis(REDIS_HOST, REDIS_PORT); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + public void testRedisListDataType() throws Exception { + DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction()); + RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisCommandMapper(RedisCommand.LPUSH)); + + source.addSink(redisSink); + env.execute("Test Redis List Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSetDataType() throws Exception { + DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction()); + RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisCommandMapper(RedisCommand.SADD)); + + source.addSink(redisSink); + env.execute("Test Redis Set Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisHyperLogLogDataType() throws Exception { + DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction()); + RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisCommandMapper(RedisCommand.PFADD)); + + source.addSink(redisSink); + env.execute("Test Redis Hyper Log Log Data Type"); + + assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY))); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSortedSetDataType() throws Exception { + DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet()); + RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisAdditionalDataMapper(RedisCommand.ZADD)); + + source.addSink(redisSink); + env.execute("Test Redis Sorted Set Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY)); + + jedis.del(REDIS_ADDITIONAL_KEY); + } + + @Test + public void testRedisHashDataType() throws Exception { + DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash()); + RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisAdditionalDataMapper(RedisCommand.HSET)); + + source.addSink(redisSink); + env.execute("Test Redis Hash Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY)); + + jedis.del(REDIS_ADDITIONAL_KEY); + } + + @After + public void tearDown(){ + if(jedis != null){ + jedis.close(); + } + } + + private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ctx.collect(new Tuple2<>(REDIS_KEY, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ctx.collect(new Tuple2<>("" + i, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + private static class TestSourceFunctionSortedSet implements SourceFunction<Tuple2<String, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ctx.collect(new Tuple2<>( "message #" + i, "" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + public static class RedisCommandMapper implements RedisMapper<Tuple2<String, String>>{ + + private RedisCommand redisCommand; + + public RedisCommandMapper(RedisCommand redisCommand){ + this.redisCommand = redisCommand; + } + + @Override + public RedisCommandDescription getCommandDescription() { + return new RedisCommandDescription(redisCommand); + } + + @Override + public String getKeyFromData(Tuple2<String, String> data) { + return data.f0; + } + + @Override + public String getValueFromData(Tuple2<String, String> data) { + return data.f1; + } + } + + public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String, String>>{ + + private RedisCommand redisCommand; + + public RedisAdditionalDataMapper(RedisCommand redisCommand){ + this.redisCommand = redisCommand; + } + + @Override + public RedisCommandDescription getCommandDescription() { + return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY); + } + + @Override + public String getKeyFromData(Tuple2<String, String> data) { + return data.f0; + } + + @Override + public String getValueFromData(Tuple2<String, String> data) { + return data.f1; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java new file mode 100644 index 0000000..caf3945 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.JedisPool; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Before; +import org.junit.After; +import org.junit.Test; +import redis.clients.jedis.JedisPubSub; + +import static org.junit.Assert.assertEquals; + +public class RedisSinkPublishITCase extends RedisITCaseBase { + + private static final int NUM_ELEMENTS = 20; + private static final String REDIS_CHANNEL = "CHANNEL"; + + private static final List<String> sourceList = new ArrayList<>(); + private Thread sinkThread; + private PubSub pubSub; + + @Before + public void before() throws Exception { + pubSub = new PubSub(); + sinkThread = new Thread(new Subscribe(pubSub)); + } + + @Test + public void redisSinkTest() throws Exception { + sinkThread.start(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder() + .setHost(REDIS_HOST) + .setPort(REDIS_PORT).build(); + DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction()); + + RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, new RedisTestMapper()); + + source.addSink(redisSink); + + env.execute("Redis Sink Test"); + + assertEquals(NUM_ELEMENTS, sourceList.size()); + } + + @After + public void after() throws Exception { + pubSub.unsubscribe(); + sinkThread.join(); + sourceList.clear(); + } + + private class Subscribe implements Runnable { + private PubSub localPubSub; + private Subscribe(PubSub pubSub){ + this.localPubSub = pubSub; + } + + @Override + public void run() { + JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT); + pool.getResource().subscribe(localPubSub, REDIS_CHANNEL); + } + } + + private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + public static class PubSub extends JedisPubSub { + + @Override + public void onMessage(String channel, String message) { + sourceList.add(message); + } + + } + + private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>>{ + + @Override + public RedisCommandDescription getCommandDescription() { + return new RedisCommandDescription(RedisCommand.PUBLISH); + } + + @Override + public String getKeyFromData(Tuple2<String, String> data) { + return data.f0; + } + + @Override + public String getValueFromData(Tuple2<String, String> data) { + return data.f1; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java new file mode 100644 index 0000000..59f59f2 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import redis.clients.jedis.exceptions.JedisConnectionException; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.fail; + +public class RedisSinkTest extends TestLogger { + + @Test(expected=NullPointerException.class) + public void shouldThrowNullPointExceptionIfDataMapperIsNull(){ + new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionIfCommandDescriptionIsNull(){ + new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), new TestMapper(null)); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){ + new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH))); + } + + @Test + public void testRedisDownBehavior() throws Exception { + + // create a wrong configuration so that open() fails. + + FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder() + .setHost("127.0.0.1") + .setPort(1234).build(); + + testDownBehavior(wrongJedisPoolConfig); + } + + @Test + public void testRedisClusterDownBehavior() throws Exception { + + Set<InetSocketAddress> hosts = new HashSet<>(); + hosts.add(new InetSocketAddress("127.0.0.1", 1234)); + + // create a wrong configuration so that open() fails. + + FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder() + .setNodes(hosts) + .setTimeout(100) + .setMaxIdle(1) + .setMaxTotal(1) + .setMinIdle(1).build(); + + testDownBehavior(wrongJedisClusterConfig); + } + + @Test + public void testRedisSentinelDownBehavior() throws Exception { + + Set<String> hosts = new HashSet<>(); + hosts.add("localhost:55095"); + + // create a wrong configuration so that open() fails. + + FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder() + .setMasterName("master") + .setSentinels(hosts) + .build(); + + testDownBehavior(wrongJedisSentinelConfig); + } + + private void testDownBehavior(FlinkJedisConfigBase config) throws Exception { + RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config, + new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD)); + + try { + redisSink.open(new Configuration()); + } catch (Exception e) { + + // search for nested JedisConnectionExceptions + // because this is the expected behavior + + Throwable t = e; + int depth = 0; + while (!(t instanceof JedisConnectionException)) { + t = t.getCause(); + if (t == null || depth++ == 20) { + throw e; + } + } + } + } + + private class TestMapper implements RedisMapper<Tuple2<String, String>>{ + private RedisCommandDescription redisCommandDescription; + + public TestMapper(RedisCommandDescription redisCommandDescription){ + this.redisCommandDescription = redisCommandDescription; + } + @Override + public RedisCommandDescription getCommandDescription() { + return redisCommandDescription; + } + + @Override + public String getKeyFromData(Tuple2<String, String> data) { + return data.f0; + } + + @Override + public String getValueFromData(Tuple2<String, String> data) { + return data.f1; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java new file mode 100644 index 0000000..ed1d713 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +public class FlinkJedisConfigBaseTest extends TestLogger { + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){ + new TestConfig(-1, 0, 0, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){ + new TestConfig(1, -1, 0, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){ + new TestConfig(0, 0, -1, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){ + new TestConfig(0, 0, 0, -1); + } + + private class TestConfig extends FlinkJedisConfigBase{ + + protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java new file mode 100644 index 0000000..40db578 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +public class JedisClusterConfigTest extends TestLogger { + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfNodeValueIsNull(){ + FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder(); + builder.setMinIdle(0) + .setMaxIdle(0) + .setMaxTotal(0) + .setTimeout(0) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfNodeValuesAreEmpty(){ + Set<InetSocketAddress> set = new HashSet<>(); + FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder(); + builder.setMinIdle(0) + .setMaxIdle(0) + .setMaxTotal(0) + .setTimeout(0) + .setNodes(set) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java new file mode 100644 index 0000000..dc16cfe --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +public class JedisPoolConfigTest extends TestLogger { + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfHostValueIsNull(){ + FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder(); + builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java new file mode 100644 index 0000000..8445fae --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +public class JedisSentinelConfigTest extends TestLogger { + + public static final String MASTER_NAME = "test-master"; + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfMasterValueIsNull(){ + FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder(); + Set<String> sentinels = new HashSet<>(); + sentinels.add("127.0.0.1"); + builder.setSentinels(sentinels).build(); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfSentinelsValueIsNull(){ + FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder(); + builder.setMasterName(MASTER_NAME).build(); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowNullPointExceptionIfSentinelsValueIsEmpty(){ + FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder(); + Set<String> sentinels = new HashSet<>(); + builder.setMasterName(MASTER_NAME).setSentinels(sentinels).build(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java new file mode 100644 index 0000000..b0eee48 --- /dev/null +++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.streaming.connectors.redis.RedisSinkITCase; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisDataTypeDescriptionTest extends TestLogger { + + @Test(expected=IllegalArgumentException.class) + public void shouldThrowExceptionIfAdditionalKeyIsNotGivenForHashDataType(){ + RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.HSET); + redisCommandMapper.getCommandDescription(); + } + + @Test + public void shouldReturnNullForAdditionalDataType(){ + RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH); + RedisCommandDescription redisDataTypeDescription = redisCommandMapper.getCommandDescription(); + assertEquals(RedisDataType.LIST, redisDataTypeDescription.getCommand().getRedisDataType()); + assertNull(redisDataTypeDescription.getAdditionalKey()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-twitter/pom.xml b/flink-connectors/flink-connector-twitter/pom.xml new file mode 100644 index 0000000..27a966f --- /dev/null +++ b/flink-connectors/flink-connector-twitter/pom.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-twitter_2.10</artifactId> + <name>flink-connector-twitter</name> + + <packaging>jar</packaging> + + <properties> + <hbc-core.version>2.2.0</hbc-core.version> + </properties> + + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>hbc-core</artifactId> + <version>${hbc-core.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <configuration> + <artifactSet> + <includes combine.children="append"> + <!-- We include all dependencies that transitively depend on guava --> + <include>com.twitter:hbc-core</include> + <include>com.twitter:joauth</include> + <include>org.apache.httpcomponents:httpclient</include> + <include>org.apache.httpcomponents:httpcore</include> + </includes> + </artifactSet> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java new file mode 100644 index 0000000..66fa237 --- /dev/null +++ b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.twitter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.Objects; +import java.util.Properties; + +import com.twitter.hbc.common.DelimitedStreamReader; +import com.twitter.hbc.core.endpoint.StreamingEndpoint; +import com.twitter.hbc.core.processor.HosebirdMessageProcessor; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.hbc.ClientBuilder; +import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; +import com.twitter.hbc.httpclient.BasicClient; +import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.OAuth1; + +/** + * Implementation of {@link SourceFunction} specialized to emit tweets from + * Twitter. This is not a parallel source because the Twitter API only allows + * two concurrent connections. + */ +public class TwitterSource extends RichSourceFunction<String> implements StoppableFunction { + + private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class); + + private static final long serialVersionUID = 1L; + + // ----- Required property keys + + public static final String CONSUMER_KEY = "twitter-source.consumerKey"; + + public static final String CONSUMER_SECRET = "twitter-source.consumerSecret"; + + public static final String TOKEN = "twitter-source.token"; + + public static final String TOKEN_SECRET = "twitter-source.tokenSecret"; + + // ------ Optional property keys + + public static final String CLIENT_NAME = "twitter-source.name"; + + public static final String CLIENT_HOSTS = "twitter-source.hosts"; + + public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize"; + + // ----- Fields set by the constructor + + private final Properties properties; + + private EndpointInitializer initializer = new SampleStatusesEndpoint(); + + // ----- Runtime fields + private transient BasicClient client; + private transient Object waitLock; + private transient boolean running = true; + + + /** + * Create {@link TwitterSource} for streaming + * + * @param properties For the source + */ + public TwitterSource(Properties properties) { + checkProperty(properties, CONSUMER_KEY); + checkProperty(properties, CONSUMER_SECRET); + checkProperty(properties, TOKEN); + checkProperty(properties, TOKEN_SECRET); + + this.properties = properties; + } + + private static void checkProperty(Properties p, String key) { + if(!p.containsKey(key)) { + throw new IllegalArgumentException("Required property '" + key + "' not set."); + } + } + + + /** + * Set a custom endpoint initializer. + */ + public void setCustomEndpointInitializer(EndpointInitializer initializer) { + Objects.requireNonNull(initializer, "Initializer has to be set"); + ClosureCleaner.ensureSerializable(initializer); + this.initializer = initializer; + } + + // ----- Source lifecycle + + @Override + public void open(Configuration parameters) throws Exception { + waitLock = new Object(); + } + + + @Override + public void run(final SourceContext<String> ctx) throws Exception { + LOG.info("Initializing Twitter Streaming API connection"); + + StreamingEndpoint endpoint = initializer.createEndpoint(); + + Authentication auth = new OAuth1(properties.getProperty(CONSUMER_KEY), + properties.getProperty(CONSUMER_SECRET), + properties.getProperty(TOKEN), + properties.getProperty(TOKEN_SECRET)); + + client = new ClientBuilder() + .name(properties.getProperty(CLIENT_NAME, "flink-twitter-source")) + .hosts(properties.getProperty(CLIENT_HOSTS, Constants.STREAM_HOST)) + .endpoint(endpoint) + .authentication(auth) + .processor(new HosebirdMessageProcessor() { + public DelimitedStreamReader reader; + + @Override + public void setup(InputStream input) { + reader = new DelimitedStreamReader(input, Constants.DEFAULT_CHARSET, Integer.parseInt(properties.getProperty(CLIENT_BUFFER_SIZE, "50000"))); + } + + @Override + public boolean process() throws IOException, InterruptedException { + String line = reader.readLine(); + ctx.collect(line); + return true; + } + }) + .build(); + + client.connect(); + running = true; + + LOG.info("Twitter Streaming API connection established successfully"); + + // just wait now + while(running) { + synchronized (waitLock) { + waitLock.wait(100L); + } + } + } + + @Override + public void close() { + this.running = false; + LOG.info("Closing source"); + if (client != null) { + // client seems to be thread-safe + client.stop(); + } + // leave main method + synchronized (waitLock) { + waitLock.notify(); + } + } + + @Override + public void cancel() { + LOG.info("Cancelling Twitter source"); + close(); + } + + @Override + public void stop() { + LOG.info("Stopping Twitter source"); + close(); + } + + // ------ Custom endpoints + + /** + * Implementing this interface allows users of this source to set a custom endpoint. + */ + public interface EndpointInitializer { + StreamingEndpoint createEndpoint(); + } + + /** + * Default endpoint initializer returning the {@see StatusesSampleEndpoint}. + */ + private static class SampleStatusesEndpoint implements EndpointInitializer, Serializable { + @Override + public StreamingEndpoint createEndpoint() { + // this default endpoint initializer returns the sample endpoint: Returning a sample from the firehose (all tweets) + StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); + endpoint.stallWarnings(false); + endpoint.delimited(false); + return endpoint; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml b/flink-connectors/flink-hadoop-compatibility/pom.xml new file mode 100644 index 0000000..5938560 --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/pom.xml @@ -0,0 +1,182 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-hadoop-compatibility_2.10</artifactId> + <name>flink-hadoop-compatibility</name> + + <packaging>jar</packaging> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + </dependencies> + + + <build> + <plugins> + <!-- activate API compatibility checks --> + <plugin> + <groupId>com.github.siom79.japicmp</groupId> + <artifactId>japicmp-maven-plugin</artifactId> + </plugin> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Scala Code Style, most of the configuration done via plugin management --> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <configuration> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java new file mode 100644 index 0000000..7bcb4bf --- /dev/null +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.WritableComparator; +import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; +import org.apache.hadoop.io.Writable; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable + * interface defines the serialization and deserialization routines for the data type. + * + * @param <T> The type of the class represented by this type information. + */ +@Public +public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> { + + private static final long serialVersionUID = 1L; + + private final Class<T> typeClass; + + @PublicEvolving + public WritableTypeInfo(Class<T> typeClass) { + this.typeClass = checkNotNull(typeClass); + + checkArgument( + Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class), + "WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + @PublicEvolving + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) { + if(Comparable.class.isAssignableFrom(typeClass)) { + return new WritableComparator(sortOrderAscending, typeClass); + } + else { + throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " + + "Class does not implement Comparable interface."); + } + } + + @Override + @PublicEvolving + public boolean isBasicType() { + return false; + } + + @Override + @PublicEvolving + public boolean isTupleType() { + return false; + } + + @Override + @PublicEvolving + public int getArity() { + return 1; + } + + @Override + @PublicEvolving + public int getTotalFields() { + return 1; + } + + @Override + @PublicEvolving + public Class<T> getTypeClass() { + return this.typeClass; + } + + @Override + @PublicEvolving + public boolean isKeyType() { + return Comparable.class.isAssignableFrom(typeClass); + } + + @Override + @PublicEvolving + public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) { + return new WritableSerializer<T>(typeClass); + } + + @Override + public String toString() { + return "WritableType<" + typeClass.getName() + ">"; + } + + @Override + public int hashCode() { + return typeClass.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof WritableTypeInfo) { + @SuppressWarnings("unchecked") + WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj; + + return writableTypeInfo.canEqual(this) && + typeClass == writableTypeInfo.typeClass; + + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof WritableTypeInfo; + } + + // -------------------------------------------------------------------------------------------- + + @PublicEvolving + static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) { + if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) { + return new WritableTypeInfo<T>(typeClass); + } + else { + throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName()); + } + } + +}
