STORM-691 Add basic lookup / persist bolts * Add Basic lookup / persist Bolts ** support data types : string, list, hash, set, sorted set, hyperloglog * rename util package to common
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f7c0bf8a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f7c0bf8a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f7c0bf8a Branch: refs/heads/nimbus-ha-branch Commit: f7c0bf8a7c843c6e555ee982a85e3952d1c28b33 Parents: 64d7ac6 Author: Jungtaek Lim <[email protected]> Authored: Sat Feb 28 22:29:32 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Sat Feb 28 22:29:32 2015 +0900 ---------------------------------------------------------------------- .../storm/redis/bolt/AbstractRedisBolt.java | 8 +- .../storm/redis/bolt/RedisLookupBolt.java | 109 ++++++++++++++++++ .../apache/storm/redis/bolt/RedisStoreBolt.java | 97 ++++++++++++++++ .../redis/common/config/JedisClusterConfig.java | 82 +++++++++++++ .../redis/common/config/JedisPoolConfig.java | 97 ++++++++++++++++ .../common/container/JedisClusterContainer.java | 47 ++++++++ .../JedisCommandsContainerBuilder.java | 38 ++++++ .../JedisCommandsInstanceContainer.java | 25 ++++ .../redis/common/container/JedisContainer.java | 65 +++++++++++ .../common/mapper/RedisDataTypeDescription.java | 33 ++++++ .../redis/common/mapper/RedisLookupMapper.java | 40 +++++++ .../storm/redis/common/mapper/RedisMapper.java | 5 + .../redis/common/mapper/RedisStoreMapper.java | 21 ++++ .../storm/redis/common/mapper/TupleMapper.java | 27 +++++ .../trident/mapper/TridentTupleMapper.java | 27 ----- .../trident/state/RedisClusterMapState.java | 2 +- .../redis/trident/state/RedisClusterState.java | 2 +- .../trident/state/RedisClusterStateQuerier.java | 10 +- .../trident/state/RedisClusterStateUpdater.java | 10 +- .../redis/trident/state/RedisMapState.java | 2 +- .../storm/redis/trident/state/RedisState.java | 2 +- .../redis/trident/state/RedisStateQuerier.java | 10 +- .../state/RedisStateSetCountQuerier.java | 10 +- .../trident/state/RedisStateSetUpdater.java | 10 +- .../redis/trident/state/RedisStateUpdater.java | 10 +- .../redis/util/config/JedisClusterConfig.java | 82 ------------- .../redis/util/config/JedisPoolConfig.java | 97 ---------------- .../util/container/JedisClusterContainer.java | 47 -------- .../JedisCommandsContainerBuilder.java | 38 ------ .../JedisCommandsInstanceContainer.java | 25 ---- .../redis/util/container/JedisContainer.java | 65 ----------- .../storm/redis/topology/LookupWordCount.java | 115 +++++++++++++------ .../redis/topology/PersistentWordCount.java | 46 +++++++- .../storm/redis/topology/WordCounter.java | 19 ++- .../redis/trident/WordCountTridentRedis.java | 7 +- .../trident/WordCountTridentRedisCluster.java | 6 +- .../WordCountTridentRedisClusterMap.java | 8 +- .../redis/trident/WordCountTridentRedisMap.java | 9 +- .../redis/trident/WordCountTupleMapper.java | 10 +- 39 files changed, 872 insertions(+), 491 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java index 0b2a7f3..158fcaa 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java @@ -20,10 +20,10 @@ package org.apache.storm.redis.bolt; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichBolt; -import org.apache.storm.redis.util.config.JedisClusterConfig; -import org.apache.storm.redis.util.config.JedisPoolConfig; -import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder; -import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer; +import org.apache.storm.redis.common.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder; +import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer; import redis.clients.jedis.JedisCommands; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java new file mode 100644 index 0000000..c40e983 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.bolt; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; +import org.apache.storm.redis.common.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import redis.clients.jedis.JedisCommands; + +import java.util.List; + +public class RedisLookupBolt extends AbstractRedisBolt { + private final RedisLookupMapper lookupMapper; + private final RedisDataTypeDescription.RedisDataType dataType; + private final String additionalKey; + + public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) { + super(config); + + this.lookupMapper = lookupMapper; + + RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription(); + this.dataType = dataTypeDescription.getDataType(); + this.additionalKey = dataTypeDescription.getAdditionalKey(); + } + + public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) { + super(config); + + this.lookupMapper = lookupMapper; + + RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription(); + this.dataType = dataTypeDescription.getDataType(); + this.additionalKey = dataTypeDescription.getAdditionalKey(); + } + + @Override + public void execute(Tuple input) { + String key = lookupMapper.getKeyFromTuple(input); + Object lookupValue = null; + + JedisCommands jedisCommand = null; + try { + jedisCommand = getInstance(); + + switch (dataType) { + case STRING: + lookupValue = jedisCommand.get(key); + break; + + case LIST: + lookupValue = jedisCommand.lpop(key); + break; + + case HASH: + lookupValue = jedisCommand.hget(additionalKey, key); + break; + + case SET: + lookupValue = jedisCommand.scard(key); + break; + + case SORTED_SET: + lookupValue = jedisCommand.zscore(additionalKey, key); + break; + + case HYPER_LOG_LOG: + lookupValue = jedisCommand.pfcount(key); + break; + } + + List<Values> values = lookupMapper.toTuple(input, lookupValue); + for (Values value : values) { + collector.emit(input, value); + } + + collector.ack(input); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(input); + } finally { + returnInstance(jedisCommand); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + lookupMapper.declareOutputFields(declarer); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java new file mode 100644 index 0000000..5602c44 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.bolt; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import org.apache.storm.redis.common.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; +import redis.clients.jedis.JedisCommands; + +public class RedisStoreBolt extends AbstractRedisBolt { + private final RedisStoreMapper storeMapper; + private final RedisDataTypeDescription.RedisDataType dataType; + private final String additionalKey; + + public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) { + super(config); + this.storeMapper = storeMapper; + + RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); + this.dataType = dataTypeDescription.getDataType(); + this.additionalKey = dataTypeDescription.getAdditionalKey(); + } + + public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) { + super(config); + this.storeMapper = storeMapper; + + RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); + this.dataType = dataTypeDescription.getDataType(); + this.additionalKey = dataTypeDescription.getAdditionalKey(); + } + + @Override + public void execute(Tuple input) { + String key = storeMapper.getKeyFromTuple(input); + String value = storeMapper.getValueFromTuple(input); + + JedisCommands jedisCommand = null; + try { + jedisCommand = getInstance(); + + switch (dataType) { + case STRING: + jedisCommand.set(key, value); + break; + + case LIST: + jedisCommand.rpush(key, value); + break; + + case HASH: + jedisCommand.hset(additionalKey, key, value); + break; + + case SET: + jedisCommand.sadd(key, value); + break; + + case SORTED_SET: + jedisCommand.zadd(additionalKey, Double.valueOf(value), key); + + case HYPER_LOG_LOG: + jedisCommand.pfadd(key, value); + break; + } + + collector.ack(input); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(input); + } finally { + returnInstance(jedisCommand); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java new file mode 100644 index 0000000..a13eced --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.config; + +import com.google.common.base.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +public class JedisClusterConfig implements Serializable { + private Set<InetSocketAddress> nodes; + private int timeout; + private int maxRedirections; + + public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) { + this.nodes = nodes; + this.timeout = timeout; + this.maxRedirections = maxRedirections; + } + + public Set<HostAndPort> getNodes() { + Set<HostAndPort> ret = new HashSet<HostAndPort>(); + for (InetSocketAddress node : nodes) { + ret.add(new HostAndPort(node.getHostName(), node.getPort())); + } + return ret; + } + + public int getTimeout() { + return timeout; + } + + public int getMaxRedirections() { + return maxRedirections; + } + + public static class Builder { + private Set<InetSocketAddress> nodes; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int maxRedirections = 5; + + public Builder setNodes(Set<InetSocketAddress> nodes) { + this.nodes = nodes; + return this; + } + + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public Builder setMaxRedirections(int maxRedirections) { + this.maxRedirections = maxRedirections; + return this; + } + + public JedisClusterConfig build() { + Preconditions.checkNotNull(this.nodes, "Node information should be presented"); + + return new JedisClusterConfig(nodes, timeout, maxRedirections); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java new file mode 100644 index 0000000..cc5f6e4 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.config; + +import redis.clients.jedis.Protocol; + +import java.io.Serializable; + +public class JedisPoolConfig implements Serializable { + public static final String DEFAULT_HOST = "127.0.0.1"; + + private String host; + private int port; + private int timeout; + private int database; + private String password; + + public JedisPoolConfig(String host, int port, int timeout, String password, int database) { + this.host = host; + this.port = port; + this.timeout = timeout; + this.database = database; + this.password = password; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public int getTimeout() { + return timeout; + } + + public int getDatabase() { + return database; + } + + public String getPassword() { + return password; + } + + public static class Builder { + private String host = DEFAULT_HOST; + private int port = Protocol.DEFAULT_PORT; + private int timeout = Protocol.DEFAULT_TIMEOUT; + private int database = Protocol.DEFAULT_DATABASE; + private String password; + + public Builder setHost(String host) { + this.host = host; + return this; + } + + public Builder setPort(int port) { + this.port = port; + return this; + } + + public Builder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public Builder setDatabase(int database) { + this.database = database; + return this; + } + + public Builder setPassword(String password) { + this.password = password; + return this; + } + + public JedisPoolConfig build() { + return new JedisPoolConfig(host, port, timeout, password, database); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java new file mode 100644 index 0000000..a1ff19f --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.container; + +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisCommands; + +import java.io.Closeable; + +public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable { + + private JedisCluster jedisCluster; + + public JedisClusterContainer(JedisCluster jedisCluster) { + this.jedisCluster = jedisCluster; + } + + @Override + public JedisCommands getInstance() { + return this.jedisCluster; + } + + @Override + public void returnInstance(JedisCommands jedisCommands) { + // do nothing + } + + @Override + public void close() { + this.jedisCluster.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java new file mode 100644 index 0000000..a2f8c2e --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.container; + +import org.apache.storm.redis.common.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; + +public class JedisCommandsContainerBuilder { + + public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); + + public static JedisCommandsInstanceContainer build(JedisPoolConfig config) { + JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase()); + return new JedisContainer(jedisPool); + } + + public static JedisCommandsInstanceContainer build(JedisClusterConfig config) { + JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG); + return new JedisClusterContainer(jedisCluster); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java new file mode 100644 index 0000000..9ec32b9 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.container; + +import redis.clients.jedis.JedisCommands; + +public interface JedisCommandsInstanceContainer { + JedisCommands getInstance(); + void returnInstance(JedisCommands jedisCommands); +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java new file mode 100644 index 0000000..621c05b --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCommands; +import redis.clients.jedis.JedisPool; + +import java.io.Closeable; +import java.io.IOException; + +public class JedisContainer implements JedisCommandsInstanceContainer, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class); + + private JedisPool jedisPool; + + public JedisContainer(JedisPool jedisPool) { + this.jedisPool = jedisPool; + } + + @Override + public JedisCommands getInstance() { + return jedisPool.getResource(); + } + + @Override + public void returnInstance(JedisCommands jedisCommands) { + if (jedisCommands == null) { + return; + } + + try { + ((Closeable) jedisCommands).close(); + } catch (IOException e) { + LOG.warn("Failed to close (return) instance to pool"); + try { + jedisPool.returnBrokenResource((Jedis) jedisCommands); + } catch (Exception e2) { + LOG.error("Failed to discard instance from pool"); + } + } + } + + @Override + public void close() { + jedisPool.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java new file mode 100644 index 0000000..d2a4af2 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java @@ -0,0 +1,33 @@ +package org.apache.storm.redis.common.mapper; + +import java.io.Serializable; + +public class RedisDataTypeDescription implements Serializable { + public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG } + + private RedisDataType dataType; + private String additionalKey; + + public RedisDataTypeDescription(RedisDataType dataType) { + this(dataType, null); + } + + public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) { + this.dataType = dataType; + this.additionalKey = additionalKey; + + if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET) { + if (additionalKey == null) { + throw new IllegalArgumentException("Hash and Sorted Set should have additional key"); + } + } + } + + public RedisDataType getDataType() { + return dataType; + } + + public String getAdditionalKey() { + return additionalKey; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java new file mode 100644 index 0000000..880aea1 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.mapper; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.ITuple; +import backtype.storm.tuple.Values; + +import java.util.List; + +public interface RedisLookupMapper extends TupleMapper, RedisMapper { + /** + * Converts return value from Redis to a list of storm values that can be emitted. + * @param input the input tuple. + * @param value Redis query response value. Can be String, Boolean, Long regarding of data type. + * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple. + */ + public List<Values> toTuple(ITuple input, Object value); + + /** + * declare what are the fields that this code will output. + * @param declarer + */ + void declareOutputFields(OutputFieldsDeclarer declarer); +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java new file mode 100644 index 0000000..d19acaa --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java @@ -0,0 +1,5 @@ +package org.apache.storm.redis.common.mapper; + +public interface RedisMapper { + public RedisDataTypeDescription getDataTypeDescription(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java new file mode 100644 index 0000000..b3d7adf --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.mapper; + +public interface RedisStoreMapper extends TupleMapper, RedisMapper { +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java new file mode 100644 index 0000000..86664b8 --- /dev/null +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.redis.common.mapper; + +import backtype.storm.tuple.ITuple; + +import java.io.Serializable; + +public interface TupleMapper extends Serializable { + public String getKeyFromTuple(ITuple tuple); + public String getValueFromTuple(ITuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java deleted file mode 100644 index 4c10143..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.trident.mapper; - -import storm.trident.tuple.TridentTuple; - -import java.io.Serializable; - -public interface TridentTupleMapper extends Serializable { - public String getKeyFromTridentTuple(TridentTuple tuple); - public String getValueFromTridentTuple(TridentTuple tuple); -} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java index 24c1df1..1154376 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java @@ -23,7 +23,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.storm.redis.util.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisClusterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCluster; http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java index 493ffdd..d74e838 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java @@ -18,7 +18,7 @@ package org.apache.storm.redis.trident.state; import backtype.storm.task.IMetricsContext; -import org.apache.storm.redis.util.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisClusterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCluster; http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java index e0207e2..17614a1 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java @@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state; import backtype.storm.tuple.Values; import com.google.common.collect.Lists; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCluster; @@ -33,9 +33,9 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class); private final String redisKeyPrefix; - private final TridentTupleMapper tupleMapper; + private final TupleMapper tupleMapper; - public RedisClusterStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) { + public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) { this.redisKeyPrefix = redisKeyPrefix; this.tupleMapper = tupleMapper; } @@ -52,7 +52,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTridentTuple(input); + String key = this.tupleMapper.getKeyFromTuple(input); if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { key = redisKeyPrefix + key; } @@ -72,7 +72,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat @Override public void execute(TridentTuple tuple, String s, TridentCollector collector) { - String key = this.tupleMapper.getKeyFromTridentTuple(tuple); + String key = this.tupleMapper.getKeyFromTuple(tuple); collector.emit(new Values(key, s)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java index e72735a..023b527 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java @@ -17,7 +17,7 @@ */ package org.apache.storm.redis.trident.state; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCluster; @@ -31,10 +31,10 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class); private final String redisKeyPrefix; - private final TridentTupleMapper tupleMapper; + private final TupleMapper tupleMapper; private final int expireIntervalSec; - public RedisClusterStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) { + public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) { this.redisKeyPrefix = redisKeyPrefix; this.tupleMapper = tupleMapper; if (expireIntervalSec > 0) { @@ -52,12 +52,12 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState try { jedisCluster = redisClusterState.getJedisCluster(); for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTridentTuple(input); + String key = this.tupleMapper.getKeyFromTuple(input); String redisKey = key; if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { redisKey = redisKeyPrefix + redisKey; } - String value = this.tupleMapper.getValueFromTridentTuple(input); + String value = this.tupleMapper.getValueFromTuple(input); logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]"); http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java index f934cea..7f3edd1 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java @@ -22,7 +22,7 @@ import backtype.storm.tuple.Values; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java index f2fd624..2c7fd13 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java @@ -18,7 +18,7 @@ package org.apache.storm.redis.trident.state; import backtype.storm.task.IMetricsContext; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java index 051088e..294e83b 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java @@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state; import backtype.storm.tuple.Values; import com.google.common.collect.Lists; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; @@ -33,9 +33,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> { private static final Logger logger = LoggerFactory.getLogger(RedisState.class); private final String redisKeyPrefix; - private final TridentTupleMapper tupleMapper; + private final TupleMapper tupleMapper; - public RedisStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) { + public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) { this.redisKeyPrefix = redisKeyPrefix; this.tupleMapper = tupleMapper; } @@ -44,7 +44,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> { public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) { List<String> keys = Lists.newArrayList(); for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTridentTuple(input); + String key = this.tupleMapper.getKeyFromTuple(input); if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { key = redisKeyPrefix + key; } @@ -64,7 +64,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> { @Override public void execute(TridentTuple tuple, String s, TridentCollector collector) { - String key = this.tupleMapper.getKeyFromTridentTuple(tuple); + String key = this.tupleMapper.getKeyFromTuple(tuple); collector.emit(new Values(key, s)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java index 5b04d59..6b75f31 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java @@ -18,7 +18,7 @@ package org.apache.storm.redis.trident.state; import backtype.storm.tuple.Values; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; @@ -33,9 +33,9 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon private static final Logger logger = LoggerFactory.getLogger(RedisState.class); private final String redisKeyPrefix; - private final TridentTupleMapper tupleMapper; + private final TupleMapper tupleMapper; - public RedisStateSetCountQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) { + public RedisStateSetCountQuerier(String redisKeyPrefix, TupleMapper tupleMapper) { this.redisKeyPrefix = redisKeyPrefix; this.tupleMapper = tupleMapper; } @@ -48,7 +48,7 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon try { jedis = redisState.getJedis(); for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTridentTuple(input); + String key = this.tupleMapper.getKeyFromTuple(input); if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { key = redisKeyPrefix + key; } @@ -68,7 +68,7 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon @Override public void execute(TridentTuple tuple, Long s, TridentCollector collector) { - String key = this.tupleMapper.getKeyFromTridentTuple(tuple); + String key = this.tupleMapper.getKeyFromTuple(tuple); collector.emit(new Values(key, s)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java index c36d1f0..d7c43da 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java @@ -18,7 +18,7 @@ package org.apache.storm.redis.trident.state; import backtype.storm.tuple.Values; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; @@ -32,10 +32,10 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> { private static final Logger logger = LoggerFactory.getLogger(RedisState.class); private final String redisKeyPrefix; - private final TridentTupleMapper tupleMapper; + private final TupleMapper tupleMapper; private final int expireIntervalSec; - public RedisStateSetUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) { + public RedisStateSetUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) { this.redisKeyPrefix = redisKeyPrefix; this.tupleMapper = tupleMapper; if (expireIntervalSec > 0) { @@ -53,12 +53,12 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> { try { jedis = redisState.getJedis(); for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTridentTuple(input); + String key = this.tupleMapper.getKeyFromTuple(input); String redisKey = key; if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { redisKey = redisKeyPrefix + redisKey; } - String value = this.tupleMapper.getValueFromTridentTuple(input); + String value = this.tupleMapper.getValueFromTuple(input); logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]"); http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java index 67f7c51..664a222 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java @@ -17,7 +17,7 @@ */ package org.apache.storm.redis.trident.state; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; @@ -31,10 +31,10 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> { private static final Logger logger = LoggerFactory.getLogger(RedisState.class); private final String redisKeyPrefix; - private final TridentTupleMapper tupleMapper; + private final TupleMapper tupleMapper; private final int expireIntervalSec; - public RedisStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) { + public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) { this.redisKeyPrefix = redisKeyPrefix; this.tupleMapper = tupleMapper; if (expireIntervalSec > 0) { @@ -51,12 +51,12 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> { try { jedis = redisState.getJedis(); for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTridentTuple(input); + String key = this.tupleMapper.getKeyFromTuple(input); String redisKey = key; if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { redisKey = redisKeyPrefix + redisKey; } - String value = this.tupleMapper.getValueFromTridentTuple(input); + String value = this.tupleMapper.getValueFromTuple(input); logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]"); http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java deleted file mode 100644 index 355119a..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.util.config; - -import com.google.common.base.Preconditions; -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Protocol; - -import java.io.Serializable; -import java.net.InetSocketAddress; -import java.util.HashSet; -import java.util.Set; - -public class JedisClusterConfig implements Serializable { - private Set<InetSocketAddress> nodes; - private int timeout; - private int maxRedirections; - - public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) { - this.nodes = nodes; - this.timeout = timeout; - this.maxRedirections = maxRedirections; - } - - public Set<HostAndPort> getNodes() { - Set<HostAndPort> ret = new HashSet<HostAndPort>(); - for (InetSocketAddress node : nodes) { - ret.add(new HostAndPort(node.getHostName(), node.getPort())); - } - return ret; - } - - public int getTimeout() { - return timeout; - } - - public int getMaxRedirections() { - return maxRedirections; - } - - public static class Builder { - private Set<InetSocketAddress> nodes; - private int timeout = Protocol.DEFAULT_TIMEOUT; - private int maxRedirections = 5; - - public Builder setNodes(Set<InetSocketAddress> nodes) { - this.nodes = nodes; - return this; - } - - public Builder setTimeout(int timeout) { - this.timeout = timeout; - return this; - } - - public Builder setMaxRedirections(int maxRedirections) { - this.maxRedirections = maxRedirections; - return this; - } - - public JedisClusterConfig build() { - Preconditions.checkNotNull(this.nodes, "Node information should be presented"); - - return new JedisClusterConfig(nodes, timeout, maxRedirections); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java deleted file mode 100644 index 9a42cf7..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.util.config; - -import redis.clients.jedis.Protocol; - -import java.io.Serializable; - -public class JedisPoolConfig implements Serializable { - public static final String DEFAULT_HOST = "127.0.0.1"; - - private String host; - private int port; - private int timeout; - private int database; - private String password; - - public JedisPoolConfig(String host, int port, int timeout, String password, int database) { - this.host = host; - this.port = port; - this.timeout = timeout; - this.database = database; - this.password = password; - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public int getTimeout() { - return timeout; - } - - public int getDatabase() { - return database; - } - - public String getPassword() { - return password; - } - - public static class Builder { - private String host = DEFAULT_HOST; - private int port = Protocol.DEFAULT_PORT; - private int timeout = Protocol.DEFAULT_TIMEOUT; - private int database = Protocol.DEFAULT_DATABASE; - private String password; - - public Builder setHost(String host) { - this.host = host; - return this; - } - - public Builder setPort(int port) { - this.port = port; - return this; - } - - public Builder setTimeout(int timeout) { - this.timeout = timeout; - return this; - } - - public Builder setDatabase(int database) { - this.database = database; - return this; - } - - public Builder setPassword(String password) { - this.password = password; - return this; - } - - public JedisPoolConfig build() { - return new JedisPoolConfig(host, port, timeout, password, database); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java deleted file mode 100644 index 5fd4115..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.util.container; - -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisCommands; - -import java.io.Closeable; - -public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable { - - private JedisCluster jedisCluster; - - public JedisClusterContainer(JedisCluster jedisCluster) { - this.jedisCluster = jedisCluster; - } - - @Override - public JedisCommands getInstance() { - return this.jedisCluster; - } - - @Override - public void returnInstance(JedisCommands jedisCommands) { - // do nothing - } - - @Override - public void close() { - this.jedisCluster.close(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java deleted file mode 100644 index 8d2dd38..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.util.container; - -import org.apache.storm.redis.util.config.JedisClusterConfig; -import org.apache.storm.redis.util.config.JedisPoolConfig; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.JedisPool; - -public class JedisCommandsContainerBuilder { - - public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); - - public static JedisCommandsInstanceContainer build(JedisPoolConfig config) { - JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase()); - return new JedisContainer(jedisPool); - } - - public static JedisCommandsInstanceContainer build(JedisClusterConfig config) { - JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG); - return new JedisClusterContainer(jedisCluster); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java deleted file mode 100644 index 847d6a5..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.util.container; - -import redis.clients.jedis.JedisCommands; - -public interface JedisCommandsInstanceContainer { - JedisCommands getInstance(); - void returnInstance(JedisCommands jedisCommands); -} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java deleted file mode 100644 index e75cccc..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.util.container; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisCommands; -import redis.clients.jedis.JedisPool; - -import java.io.Closeable; -import java.io.IOException; - -public class JedisContainer implements JedisCommandsInstanceContainer, Closeable { - private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class); - - private JedisPool jedisPool; - - public JedisContainer(JedisPool jedisPool) { - this.jedisPool = jedisPool; - } - - @Override - public JedisCommands getInstance() { - return jedisPool.getResource(); - } - - @Override - public void returnInstance(JedisCommands jedisCommands) { - if (jedisCommands == null) { - return; - } - - try { - ((Closeable) jedisCommands).close(); - } catch (IOException e) { - LOG.warn("Failed to close (return) instance to pool"); - try { - jedisPool.returnBrokenResource((Jedis) jedisCommands); - } catch (Exception e2) { - LOG.error("Failed to discard instance from pool"); - } - } - } - - @Override - public void close() { - jedisPool.close(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java index a62fdff..ae053de 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java @@ -20,72 +20,65 @@ package org.apache.storm.redis.topology; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; +import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; -import org.apache.storm.redis.bolt.AbstractRedisBolt; -import org.apache.storm.redis.util.config.JedisClusterConfig; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import com.google.common.collect.Lists; +import org.apache.storm.redis.bolt.RedisLookupBolt; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.JedisCommands; -import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.exceptions.JedisException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Random; public class LookupWordCount { private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String LOOKUP_BOLT = "LOOKUP_BOLT"; + private static final String PRINT_BOLT = "PRINT_BOLT"; private static final String TEST_REDIS_HOST = "127.0.0.1"; private static final int TEST_REDIS_PORT = 6379; - public static class LookupWordTotalCountBolt extends AbstractRedisBolt { - private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class); + public static class PrintWordTotalCountBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class); private static final Random RANDOM = new Random(); + private OutputCollector collector; - public LookupWordTotalCountBolt(JedisPoolConfig config) { - super(config); - } - - public LookupWordTotalCountBolt(JedisClusterConfig config) { - super(config); + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; } @Override public void execute(Tuple input) { - JedisCommands jedisCommands = null; - try { - jedisCommands = getInstance(); - String wordName = input.getStringByField("word"); - String countStr = jedisCommands.get(wordName); + String wordName = input.getStringByField("wordName"); + String countStr = input.getStringByField("count"); + + // print lookup result with low probability + if(RANDOM.nextInt(1000) > 995) { + int count = 0; if (countStr != null) { - int count = Integer.parseInt(countStr); - this.collector.emit(new Values(wordName, count)); - - // print lookup result with low probability - if(RANDOM.nextInt(1000) > 995) { - LOG.info("Lookup result - word : " + wordName + " / count : " + count); - } - } else { - // skip - LOG.warn("Word not found in Redis - word : " + wordName); + count = Integer.parseInt(countStr); } - } finally { - if (jedisCommands != null) { - returnInstance(jedisCommands); - } - this.collector.ack(input); + LOG.info("Lookup result - word : " + wordName + " / count : " + count); } + + collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - // wordName, count - declarer.declare(new Fields("wordName", "count")); } } @@ -104,12 +97,16 @@ public class LookupWordCount { .setHost(host).setPort(port).build(); WordSpout spout = new WordSpout(); - LookupWordTotalCountBolt redisLookupBolt = new LookupWordTotalCountBolt(poolConfig); + RedisLookupMapper lookupMapper = setupLookupMapper(); + RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper); + + PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt(); //wordspout -> lookupbolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, spout, 1); - builder.setBolt(LOOKUP_BOLT, redisLookupBolt, 1).shuffleGrouping(WORD_SPOUT); + builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT); + builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT); if (args.length == 2) { LocalCluster cluster = new LocalCluster(); @@ -124,4 +121,46 @@ public class LookupWordCount { System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)"); } } + + private static RedisLookupMapper setupLookupMapper() { + return new WordCountRedisLookupMapper(); + } + + private static class WordCountRedisLookupMapper implements RedisLookupMapper { + private RedisDataTypeDescription description; + private final String hashKey = "wordCount"; + + public WordCountRedisLookupMapper() { + description = new RedisDataTypeDescription( + RedisDataTypeDescription.RedisDataType.HASH, hashKey); + } + + @Override + public List<Values> toTuple(ITuple input, Object value) { + String member = getKeyFromTuple(input); + List<Values> values = Lists.newArrayList(); + values.add(new Values(member, value)); + return values; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("wordName", "count")); + } + + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return description; + } + + @Override + public String getKeyFromTuple(ITuple tuple) { + return tuple.getStringByField("word"); + } + + @Override + public String getValueFromTuple(ITuple tuple) { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java index 535d7b9..14a969d 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java @@ -23,10 +23,14 @@ import backtype.storm.StormSubmitter; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; +import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Tuple; import org.apache.storm.redis.bolt.AbstractRedisBolt; -import org.apache.storm.redis.util.config.JedisClusterConfig; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import org.apache.storm.redis.bolt.RedisStoreBolt; +import org.apache.storm.redis.common.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCommands; @@ -36,7 +40,7 @@ import redis.clients.jedis.exceptions.JedisException; public class PersistentWordCount { private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String COUNT_BOLT = "COUNT_BOLT"; - private static final String REDIS_BOLT = "REDIS_BOLT"; + private static final String STORE_BOLT = "STORE_BOLT"; private static final String TEST_REDIS_HOST = "127.0.0.1"; private static final int TEST_REDIS_PORT = 6379; @@ -92,14 +96,15 @@ public class PersistentWordCount { WordSpout spout = new WordSpout(); WordCounter bolt = new WordCounter(); - StoreCountRedisBolt redisBolt = new StoreCountRedisBolt(poolConfig); + RedisStoreMapper storeMapper = setupStoreMapper(); + RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); // wordSpout ==> countBolt ==> RedisBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, spout, 1); - builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); - builder.setBolt(REDIS_BOLT, redisBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); + builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word")); + builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT); if (args.length == 2) { LocalCluster cluster = new LocalCluster(); @@ -114,4 +119,33 @@ public class PersistentWordCount { System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)"); } } + + private static RedisStoreMapper setupStoreMapper() { + return new WordCountStoreMapper(); + } + + private static class WordCountStoreMapper implements RedisStoreMapper { + private RedisDataTypeDescription description; + private final String hashKey = "wordCount"; + + public WordCountStoreMapper() { + description = new RedisDataTypeDescription( + RedisDataTypeDescription.RedisDataType.HASH, hashKey); + } + + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return description; + } + + @Override + public String getKeyFromTuple(ITuple tuple) { + return tuple.getStringByField("word"); + } + + @Override + public String getValueFromTuple(ITuple tuple) { + return tuple.getStringByField("count"); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java index 6a0548d..6f25038 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java @@ -23,23 +23,32 @@ import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import com.google.common.collect.Maps; import java.util.Map; import static backtype.storm.utils.Utils.tuple; public class WordCounter implements IBasicBolt { - + private Map<String, Integer> wordCounter = Maps.newHashMap(); @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context) { } - /* - * Just output the word value with a count of 1. - */ public void execute(Tuple input, BasicOutputCollector collector) { - collector.emit(tuple(input.getValues().get(0), 1)); + String word = input.getStringByField("word"); + int count; + if (wordCounter.containsKey(word)) { + count = wordCounter.get(word) + 1; + wordCounter.put(word, wordCounter.get(word) + 1); + } else { + count = 1; + } + + wordCounter.put(word, count); + collector.emit(new Values(word, String.valueOf(count))); } public void cleanup() { http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java index 9a28cb7..8b6ebc5 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java @@ -23,15 +23,14 @@ import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.apache.storm.redis.trident.state.RedisState; import org.apache.storm.redis.trident.state.RedisStateQuerier; import org.apache.storm.redis.trident.state.RedisStateUpdater; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; import storm.trident.Stream; import storm.trident.TridentState; import storm.trident.TridentTopology; -import storm.trident.state.StateFactory; import storm.trident.testing.FixedBatchSpout; public class WordCountTridentRedis { @@ -48,7 +47,7 @@ public class WordCountTridentRedis { JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); - TridentTupleMapper tupleMapper = new WordCountTupleMapper(); + TupleMapper tupleMapper = new WordCountTupleMapper(); RedisState.Factory factory = new RedisState.Factory(poolConfig); TridentTopology topology = new TridentTopology();
