[storm-redis] Remove unused and buggy feature * We can introduce counter / various data types later
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c75de7e7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c75de7e7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c75de7e7 Branch: refs/heads/nimbus-ha-branch Commit: c75de7e7e63f282f44306d7bd0635a7a9be5e925 Parents: a8a0dfd Author: Jungtaek Lim <[email protected]> Authored: Wed Mar 25 18:51:07 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Wed Mar 25 18:51:07 2015 +0900 ---------------------------------------------------------------------- .../state/RedisStateSetCountQuerier.java | 74 ------------------ .../trident/state/RedisStateSetUpdater.java | 80 -------------------- 2 files changed, 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c75de7e7/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java deleted file mode 100644 index 6b75f31..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.trident.state; - -import backtype.storm.tuple.Values; -import org.apache.storm.redis.common.mapper.TupleMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; -import storm.trident.operation.TridentCollector; -import storm.trident.state.BaseQueryFunction; -import storm.trident.tuple.TridentTuple; - -import java.util.ArrayList; -import java.util.List; - -public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Long> { - private static final Logger logger = LoggerFactory.getLogger(RedisState.class); - - private final String redisKeyPrefix; - private final TupleMapper tupleMapper; - - public RedisStateSetCountQuerier(String redisKeyPrefix, TupleMapper tupleMapper) { - this.redisKeyPrefix = redisKeyPrefix; - this.tupleMapper = tupleMapper; - } - - @Override - public List<Long> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) { - List<Long> ret = new ArrayList<Long>(); - - Jedis jedis = null; - try { - jedis = redisState.getJedis(); - for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTuple(input); - if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { - key = redisKeyPrefix + key; - } - long count = jedis.scard(key); - ret.add(count); - - logger.debug("redis get key[" + key + "] count[" + count + "]"); - } - } finally { - if (jedis != null) { - redisState.returnJedis(jedis); - } - } - - return ret; - } - - @Override - public void execute(TridentTuple tuple, Long s, TridentCollector collector) { - String key = this.tupleMapper.getKeyFromTuple(tuple); - collector.emit(new Values(key, s)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/c75de7e7/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java deleted file mode 100644 index d7c43da..0000000 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.redis.trident.state; - -import backtype.storm.tuple.Values; -import org.apache.storm.redis.common.mapper.TupleMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; -import storm.trident.operation.TridentCollector; -import storm.trident.state.BaseStateUpdater; -import storm.trident.tuple.TridentTuple; - -import java.util.List; - -public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> { - private static final Logger logger = LoggerFactory.getLogger(RedisState.class); - - private final String redisKeyPrefix; - private final TupleMapper tupleMapper; - private final int expireIntervalSec; - - public RedisStateSetUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) { - this.redisKeyPrefix = redisKeyPrefix; - this.tupleMapper = tupleMapper; - if (expireIntervalSec > 0) { - this.expireIntervalSec = expireIntervalSec; - } else { - this.expireIntervalSec = 0; - } - } - - @Override - public void updateState(RedisState redisState, List<TridentTuple> inputs, - TridentCollector collector) { - - Jedis jedis = null; - try { - jedis = redisState.getJedis(); - for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTuple(input); - String redisKey = key; - if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { - redisKey = redisKeyPrefix + redisKey; - } - String value = this.tupleMapper.getValueFromTuple(input); - - logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]"); - - if (this.expireIntervalSec > 0) { - jedis.setex(redisKey, expireIntervalSec, value); - } else { - jedis.set(redisKey, value); - } - Long count = jedis.scard(redisKey); - - collector.emit(new Values(key, count)); - } - } finally { - if (jedis != null) { - redisState.returnJedis(jedis); - } - } - } -}
