http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java index 3442c46..a65790a 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java @@ -1,28 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.redis.trident.state; +import java.io.Serializable; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.trident.state.Serializer; -import java.io.Serializable; - /** * Options of State.<br/> * It's a data structure (whole things are public) and you can access and modify all fields. @@ -30,7 +23,8 @@ import java.io.Serializable; * @param <T> value's type class */ public class Options<T> implements Serializable { - private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING); + private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = + new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING); public int localCacheSize = 1000; public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java index ecae18d..c43a18d 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java @@ -1,29 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.redis.trident.state; -import org.apache.storm.task.IMetricsContext; -import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; +import java.util.List; +import java.util.Map; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; -import redis.clients.jedis.JedisCluster; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.OpaqueValue; import org.apache.storm.trident.state.Serializer; import org.apache.storm.trident.state.State; @@ -36,9 +30,8 @@ import org.apache.storm.trident.state.map.NonTransactionalMap; import org.apache.storm.trident.state.map.OpaqueMap; import org.apache.storm.trident.state.map.SnapshottableMap; import org.apache.storm.trident.state.map.TransactionalMap; - -import java.util.List; -import java.util.Map; +import org.apache.storm.tuple.Values; +import redis.clients.jedis.JedisCluster; /** * IBackingMap implementation for Redis Cluster environment. @@ -47,6 +40,27 @@ import java.util.Map; * @see AbstractRedisMapState */ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { + private JedisCluster jedisCluster; + private Options options; + private Serializer serializer; + private KeyFactory keyFactory; + + /** + * Constructor + * + * @param jedisCluster JedisCluster + * @param options options of State + * @param serializer Serializer + * @param keyFactory KeyFactory + */ + public RedisClusterMapState(JedisCluster jedisCluster, Options options, + Serializer<T> serializer, KeyFactory keyFactory) { + this.jedisCluster = jedisCluster; + this.options = options; + this.serializer = serializer; + this.keyFactory = keyFactory; + } + /** * Provides StateFactory for opaque transactional. * @@ -189,98 +203,6 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { } /** - * RedisClusterMapState.Factory provides Redis Cluster environment version of StateFactory. - */ - protected static class Factory implements StateFactory { - public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); - - JedisClusterConfig jedisClusterConfig; - - StateType type; - Serializer serializer; - KeyFactory keyFactory; - Options options; - - /** - * Constructor - * - * @param jedisClusterConfig configuration for JedisCluster - * @param type StateType - * @param options options of State - */ - public Factory(JedisClusterConfig jedisClusterConfig, StateType type, Options options) { - this.jedisClusterConfig = jedisClusterConfig; - this.type = type; - this.options = options; - - this.keyFactory = options.keyFactory; - if (this.keyFactory == null) { - this.keyFactory = new KeyFactory.DefaultKeyFactory(); - } - this.serializer = options.serializer; - if (this.serializer == null) { - this.serializer = DEFAULT_SERIALIZERS.get(type); - if (this.serializer == null) { - throw new RuntimeException("Couldn't find serializer for state type: " + type); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), - jedisClusterConfig.getTimeout(), - jedisClusterConfig.getTimeout(), - jedisClusterConfig.getMaxRedirections(), - jedisClusterConfig.getPassword(), - DEFAULT_POOL_CONFIG); - - RedisClusterMapState state = new RedisClusterMapState(jedisCluster, options, serializer, keyFactory); - CachedMap c = new CachedMap(state, options.localCacheSize); - - MapState ms; - if (type == StateType.NON_TRANSACTIONAL) { - ms = NonTransactionalMap.build(c); - - } else if (type == StateType.OPAQUE) { - ms = OpaqueMap.build(c); - - } else if (type == StateType.TRANSACTIONAL) { - ms = TransactionalMap.build(c); - - } else { - throw new RuntimeException("Unknown state type: " + type); - } - - return new SnapshottableMap(ms, new Values(options.globalKey)); - } - } - - private JedisCluster jedisCluster; - private Options options; - private Serializer serializer; - private KeyFactory keyFactory; - - /** - * Constructor - * - * @param jedisCluster JedisCluster - * @param options options of State - * @param serializer Serializer - * @param keyFactory KeyFactory - */ - public RedisClusterMapState(JedisCluster jedisCluster, Options options, - Serializer<T> serializer, KeyFactory keyFactory) { - this.jedisCluster = jedisCluster; - this.options = options; - this.serializer = serializer; - this.keyFactory = keyFactory; - } - - /** * {@inheritDoc} */ @Override @@ -350,4 +272,75 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> { throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType()); } } + + /** + * RedisClusterMapState.Factory provides Redis Cluster environment version of StateFactory. + */ + protected static class Factory implements StateFactory { + public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); + + JedisClusterConfig jedisClusterConfig; + + StateType type; + Serializer serializer; + KeyFactory keyFactory; + Options options; + + /** + * Constructor + * + * @param jedisClusterConfig configuration for JedisCluster + * @param type StateType + * @param options options of State + */ + public Factory(JedisClusterConfig jedisClusterConfig, StateType type, Options options) { + this.jedisClusterConfig = jedisClusterConfig; + this.type = type; + this.options = options; + + this.keyFactory = options.keyFactory; + if (this.keyFactory == null) { + this.keyFactory = new KeyFactory.DefaultKeyFactory(); + } + this.serializer = options.serializer; + if (this.serializer == null) { + this.serializer = DEFAULT_SERIALIZERS.get(type); + if (this.serializer == null) { + throw new RuntimeException("Couldn't find serializer for state type: " + type); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), + jedisClusterConfig.getTimeout(), + jedisClusterConfig.getTimeout(), + jedisClusterConfig.getMaxRedirections(), + jedisClusterConfig.getPassword(), + DEFAULT_POOL_CONFIG); + + RedisClusterMapState state = new RedisClusterMapState(jedisCluster, options, serializer, keyFactory); + CachedMap c = new CachedMap(state, options.localCacheSize); + + MapState ms; + if (type == StateType.NON_TRANSACTIONAL) { + ms = NonTransactionalMap.build(c); + + } else if (type == StateType.OPAQUE) { + ms = OpaqueMap.build(c); + + } else if (type == StateType.TRANSACTIONAL) { + ms = TransactionalMap.build(c); + + } else { + throw new RuntimeException("Unknown state type: " + type); + } + + return new SnapshottableMap(ms, new Values(options.globalKey)); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java index 11d8907..7bdd86b 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java @@ -1,37 +1,39 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.redis.trident.state; -import org.apache.storm.task.IMetricsContext; +import java.util.Map; import org.apache.storm.redis.common.config.JedisClusterConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.JedisCluster; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; - -import java.util.Map; +import redis.clients.jedis.JedisCluster; /** * Implementation of State for Redis Cluster environment. */ public class RedisClusterState implements State { + private JedisCluster jedisCluster; + + /** + * Constructor + * + * @param jedisCluster JedisCluster + */ + public RedisClusterState(JedisCluster jedisCluster) { + this.jedisCluster = jedisCluster; + } + /** * {@inheritDoc} */ @@ -47,6 +49,26 @@ public class RedisClusterState implements State { } /** + * Borrows JedisCluster instance. + * <p/> + * Note that you should return borrowed instance when you finish using instance. + * + * @return JedisCluster instance + */ + public JedisCluster getJedisCluster() { + return this.jedisCluster; + } + + /** + * Returns JedisCluster instance to pool. + * + * @param jedisCluster JedisCluster instance to return to pool + */ + public void returnJedisCluster(JedisCluster jedisCluster) { + //do nothing + } + + /** * RedisClusterState.Factory implements StateFactory for Redis Cluster environment. * * @see StateFactory @@ -71,45 +93,14 @@ public class RedisClusterState implements State { @Override public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), - jedisClusterConfig.getTimeout(), - jedisClusterConfig.getTimeout(), - jedisClusterConfig.getMaxRedirections(), - jedisClusterConfig.getPassword(), - DEFAULT_POOL_CONFIG); + jedisClusterConfig.getTimeout(), + jedisClusterConfig.getTimeout(), + jedisClusterConfig.getMaxRedirections(), + jedisClusterConfig.getPassword(), + DEFAULT_POOL_CONFIG); return new RedisClusterState(jedisCluster); } } - private JedisCluster jedisCluster; - - /** - * Constructor - * - * @param jedisCluster JedisCluster - */ - public RedisClusterState(JedisCluster jedisCluster) { - this.jedisCluster = jedisCluster; - } - - /** - * Borrows JedisCluster instance. - * <p/> - * Note that you should return borrowed instance when you finish using instance. - * - * @return JedisCluster instance - */ - public JedisCluster getJedisCluster() { - return this.jedisCluster; - } - - /** - * Returns JedisCluster instance to pool. - * - * @param jedisCluster JedisCluster instance to return to pool - */ - public void returnJedisCluster(JedisCluster jedisCluster) { - //do nothing - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java index 6938409..b7401aa 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java @@ -1,28 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.redis.trident.state; -import org.apache.storm.redis.common.mapper.RedisLookupMapper; -import redis.clients.jedis.JedisCluster; - import java.util.ArrayList; import java.util.List; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; +import redis.clients.jedis.JedisCluster; /** * BaseQueryFunction implementation for Redis Cluster environment. http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java index 1cc0725..0258ae5 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.redis.trident.state; +import java.util.Map; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import redis.clients.jedis.JedisCluster; -import java.util.Map; - /** * BaseStateUpdater implementation for Redis Cluster environment. * @@ -63,25 +57,25 @@ public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClu String value = kvEntry.getValue(); switch (dataType) { - case STRING: - if (this.expireIntervalSec > 0) { - jedisCluster.setex(key, expireIntervalSec, value); - } else { - jedisCluster.set(key, value); - } - break; - case HASH: - jedisCluster.hset(additionalKey, key, value); - break; - default: - throw new IllegalArgumentException("Cannot process such data type: " + dataType); + case STRING: + if (this.expireIntervalSec > 0) { + jedisCluster.setex(key, expireIntervalSec, value); + } else { + jedisCluster.set(key, value); + } + break; + case HASH: + jedisCluster.hset(additionalKey, key, value); + break; + default: + throw new IllegalArgumentException("Cannot process such data type: " + dataType); } } // send expire command for hash only once // it expires key itself entirely, so use it with caution if (dataType == RedisDataTypeDescription.RedisDataType.HASH && - this.expireIntervalSec > 0) { + this.expireIntervalSec > 0) { jedisCluster.expire(additionalKey, expireIntervalSec); } } finally { http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java index f68b5ef..6febc41 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java @@ -1,29 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.redis.trident.state; -import org.apache.storm.task.IMetricsContext; -import org.apache.storm.tuple.Values; +import java.util.List; +import java.util.Map; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.Pipeline; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.OpaqueValue; import org.apache.storm.trident.state.Serializer; import org.apache.storm.trident.state.State; @@ -36,9 +29,10 @@ import org.apache.storm.trident.state.map.NonTransactionalMap; import org.apache.storm.trident.state.map.OpaqueMap; import org.apache.storm.trident.state.map.SnapshottableMap; import org.apache.storm.trident.state.map.TransactionalMap; - -import java.util.List; -import java.util.Map; +import org.apache.storm.tuple.Values; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.Pipeline; /** * IBackingMap implementation for single Redis environment. @@ -47,6 +41,27 @@ import java.util.Map; * @see AbstractRedisMapState */ public class RedisMapState<T> extends AbstractRedisMapState<T> { + private JedisPool jedisPool; + private Options options; + private Serializer serializer; + private KeyFactory keyFactory; + + /** + * Constructor + * + * @param jedisPool JedisPool + * @param options options of State + * @param serializer Serializer + * @param keyFactory KeyFactory + */ + public RedisMapState(JedisPool jedisPool, Options options, + Serializer<T> serializer, KeyFactory keyFactory) { + this.jedisPool = jedisPool; + this.options = options; + this.serializer = serializer; + this.keyFactory = keyFactory; + } + /** * Provides StateFactory for opaque transactional. * @@ -189,97 +204,6 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { } /** - * RedisMapState.Factory provides single Redis environment version of StateFactory. - */ - protected static class Factory implements StateFactory { - public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); - - JedisPoolConfig jedisPoolConfig; - - StateType type; - Serializer serializer; - KeyFactory keyFactory; - Options options; - - /** - * Constructor - * - * @param jedisPoolConfig configuration for JedisPool - * @param type StateType - * @param options options of State - */ - public Factory(JedisPoolConfig jedisPoolConfig, StateType type, Options options) { - this.jedisPoolConfig = jedisPoolConfig; - this.type = type; - this.options = options; - - this.keyFactory = options.keyFactory; - if (this.keyFactory == null) { - this.keyFactory = new KeyFactory.DefaultKeyFactory(); - } - this.serializer = options.serializer; - if (this.serializer == null) { - this.serializer = DEFAULT_SERIALIZERS.get(type); - if (this.serializer == null) { - throw new RuntimeException("Couldn't find serializer for state type: " + type); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, - jedisPoolConfig.getHost(), - jedisPoolConfig.getPort(), - jedisPoolConfig.getTimeout(), - jedisPoolConfig.getPassword(), - jedisPoolConfig.getDatabase()); - RedisMapState state = new RedisMapState(jedisPool, options, serializer, keyFactory); - CachedMap c = new CachedMap(state, options.localCacheSize); - - MapState ms; - if (type == StateType.NON_TRANSACTIONAL) { - ms = NonTransactionalMap.build(c); - - } else if (type == StateType.OPAQUE) { - ms = OpaqueMap.build(c); - - } else if (type == StateType.TRANSACTIONAL) { - ms = TransactionalMap.build(c); - - } else { - throw new RuntimeException("Unknown state type: " + type); - } - - return new SnapshottableMap(ms, new Values(options.globalKey)); - } - } - - private JedisPool jedisPool; - private Options options; - private Serializer serializer; - private KeyFactory keyFactory; - - /** - * Constructor - * - * @param jedisPool JedisPool - * @param options options of State - * @param serializer Serializer - * @param keyFactory KeyFactory - */ - public RedisMapState(JedisPool jedisPool, Options options, - Serializer<T> serializer, KeyFactory keyFactory) { - this.jedisPool = jedisPool; - this.options = options; - this.serializer = serializer; - this.keyFactory = keyFactory; - } - - /** * {@inheritDoc} */ @Override @@ -308,14 +232,14 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { RedisDataTypeDescription description = this.options.dataTypeDescription; switch (description.getDataType()) { - case STRING: - return jedis.mget(stringKeys); + case STRING: + return jedis.mget(stringKeys); - case HASH: - return jedis.hmget(description.getAdditionalKey(), stringKeys); + case HASH: + return jedis.hmget(description.getAdditionalKey(), stringKeys); - default: - throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType()); + default: + throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType()); } } finally { @@ -337,27 +261,27 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { RedisDataTypeDescription description = this.options.dataTypeDescription; switch (description.getDataType()) { - case STRING: - String[] keyValue = buildKeyValuesList(keyValues); - jedis.mset(keyValue); - if(this.options.expireIntervalSec > 0){ - Pipeline pipe = jedis.pipelined(); - for(int i = 0; i < keyValue.length; i += 2){ - pipe.expire(keyValue[i], this.options.expireIntervalSec); + case STRING: + String[] keyValue = buildKeyValuesList(keyValues); + jedis.mset(keyValue); + if (this.options.expireIntervalSec > 0) { + Pipeline pipe = jedis.pipelined(); + for (int i = 0; i < keyValue.length; i += 2) { + pipe.expire(keyValue[i], this.options.expireIntervalSec); + } + pipe.sync(); } - pipe.sync(); - } - break; + break; - case HASH: - jedis.hmset(description.getAdditionalKey(), keyValues); - if (this.options.expireIntervalSec > 0) { - jedis.expire(description.getAdditionalKey(), this.options.expireIntervalSec); - } - break; + case HASH: + jedis.hmset(description.getAdditionalKey(), keyValues); + if (this.options.expireIntervalSec > 0) { + jedis.expire(description.getAdditionalKey(), this.options.expireIntervalSec); + } + break; - default: - throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType()); + default: + throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType()); } } finally { @@ -378,4 +302,74 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> { return keyValueLists; } + + /** + * RedisMapState.Factory provides single Redis environment version of StateFactory. + */ + protected static class Factory implements StateFactory { + public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig(); + + JedisPoolConfig jedisPoolConfig; + + StateType type; + Serializer serializer; + KeyFactory keyFactory; + Options options; + + /** + * Constructor + * + * @param jedisPoolConfig configuration for JedisPool + * @param type StateType + * @param options options of State + */ + public Factory(JedisPoolConfig jedisPoolConfig, StateType type, Options options) { + this.jedisPoolConfig = jedisPoolConfig; + this.type = type; + this.options = options; + + this.keyFactory = options.keyFactory; + if (this.keyFactory == null) { + this.keyFactory = new KeyFactory.DefaultKeyFactory(); + } + this.serializer = options.serializer; + if (this.serializer == null) { + this.serializer = DEFAULT_SERIALIZERS.get(type); + if (this.serializer == null) { + throw new RuntimeException("Couldn't find serializer for state type: " + type); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, + jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), + jedisPoolConfig.getTimeout(), + jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + RedisMapState state = new RedisMapState(jedisPool, options, serializer, keyFactory); + CachedMap c = new CachedMap(state, options.localCacheSize); + + MapState ms; + if (type == StateType.NON_TRANSACTIONAL) { + ms = NonTransactionalMap.build(c); + + } else if (type == StateType.OPAQUE) { + ms = OpaqueMap.build(c); + + } else if (type == StateType.TRANSACTIONAL) { + ms = TransactionalMap.build(c); + + } else { + throw new RuntimeException("Unknown state type: " + type); + } + + return new SnapshottableMap(ms, new Values(options.globalKey)); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java index ec0b679..76b60f7 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java @@ -1,37 +1,40 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.redis.trident.state; -import org.apache.storm.task.IMetricsContext; +import java.util.Map; import org.apache.storm.redis.common.config.JedisPoolConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; - -import java.util.Map; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; /** * Implementation of State for single Redis environment. */ public class RedisState implements State { + private JedisPool jedisPool; + + /** + * Constructor + * + * @param jedisPool JedisPool + */ + public RedisState(JedisPool jedisPool) { + this.jedisPool = jedisPool; + } + /** * {@inheritDoc} */ @@ -47,6 +50,26 @@ public class RedisState implements State { } /** + * Borrows Jedis instance from pool. + * <p/> + * Note that you should return borrowed instance to pool when you finish using instance. + * + * @return Jedis instance + */ + public Jedis getJedis() { + return this.jedisPool.getResource(); + } + + /** + * Returns Jedis instance to pool. + * + * @param jedis Jedis instance to return to pool + */ + public void returnJedis(Jedis jedis) { + jedis.close(); + } + + /** * RedisState.Factory implements StateFactory for single Redis environment. * * @see StateFactory @@ -81,35 +104,4 @@ public class RedisState implements State { } } - private JedisPool jedisPool; - - /** - * Constructor - * - * @param jedisPool JedisPool - */ - public RedisState(JedisPool jedisPool) { - this.jedisPool = jedisPool; - } - - /** - * Borrows Jedis instance from pool. - * <p/> - * Note that you should return borrowed instance to pool when you finish using instance. - * - * @return Jedis instance - */ - public Jedis getJedis() { - return this.jedisPool.getResource(); - } - - /** - * Returns Jedis instance to pool. - * - * @param jedis Jedis instance to return to pool - */ - public void returnJedis(Jedis jedis) { - jedis.close(); - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java index d76fec2..8be00e2 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.redis.trident.state; +import java.util.List; import org.apache.storm.redis.common.mapper.RedisLookupMapper; import redis.clients.jedis.Jedis; -import java.util.List; - /** * BaseQueryFunction implementation for single Redis environment. * @@ -49,14 +43,14 @@ public class RedisStateQuerier extends AbstractRedisStateQuerier<RedisState> { String[] keysForRedis = keys.toArray(new String[keys.size()]); switch (dataType) { - case STRING: - redisVals = jedis.mget(keysForRedis); - break; - case HASH: - redisVals = jedis.hmget(additionalKey, keysForRedis); - break; - default: - throw new IllegalArgumentException("Cannot process such data type: " + dataType); + case STRING: + redisVals = jedis.mget(keysForRedis); + break; + case HASH: + redisVals = jedis.hmget(additionalKey, keysForRedis); + break; + default: + throw new IllegalArgumentException("Cannot process such data type: " + dataType); } return redisVals; http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java index d0507cf..0a5aede 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java @@ -1,29 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.redis.trident.state; +import java.util.Map; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; -import java.util.Map; - /** * BaseStateUpdater implementation for single Redis environment. * @@ -65,25 +59,25 @@ public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> { String value = kvEntry.getValue(); switch (dataType) { - case STRING: - if (this.expireIntervalSec > 0) { - pipeline.setex(key, expireIntervalSec, value); - } else { - pipeline.set(key, value); - } - break; - case HASH: - pipeline.hset(additionalKey, key, value); - break; - default: - throw new IllegalArgumentException("Cannot process such data type: " + dataType); + case STRING: + if (this.expireIntervalSec > 0) { + pipeline.setex(key, expireIntervalSec, value); + } else { + pipeline.set(key, value); + } + break; + case HASH: + pipeline.hset(additionalKey, key, value); + break; + default: + throw new IllegalArgumentException("Cannot process such data type: " + dataType); } } // send expire command for hash only once // it expires key itself entirely, so use it with caution if (dataType == RedisDataTypeDescription.RedisDataType.HASH && - this.expireIntervalSec > 0) { + this.expireIntervalSec > 0) { pipeline.expire(additionalKey, expireIntervalSec); } http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java index 565a250..fb0052d 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java @@ -1,24 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.redis.state; import com.google.common.primitives.UnsignedBytes; +import java.util.ArrayList; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.storm.redis.common.adapter.RedisCommandsAdapterJedis; import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer; import org.apache.storm.state.DefaultStateEncoder; @@ -29,11 +27,6 @@ import org.junit.Test; import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; -import java.util.ArrayList; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -79,15 +72,15 @@ public class RedisKeyValueStateIteratorTest { putEncodedKeyValueToMap(chunkMap, "key2".getBytes(), "value2".getBytes()); ScanResult<Map.Entry<byte[], byte[]>> scanResultFirst = new ScanResult<>( - "12345".getBytes(), new ArrayList<>(chunkMap.entrySet())); + "12345".getBytes(), new ArrayList<>(chunkMap.entrySet())); ScanResult<Map.Entry<byte[], byte[]>> scanResultSecond = new ScanResult<>( - ScanParams.SCAN_POINTER_START_BINARY, new ArrayList<Map.Entry<byte[], byte[]>>()); + ScanParams.SCAN_POINTER_START_BINARY, new ArrayList<Map.Entry<byte[], byte[]>>()); when(mockJedis.hscan(eq(namespace), any(byte[].class), any(ScanParams.class))) - .thenReturn(scanResultFirst, scanResultSecond); + .thenReturn(scanResultFirst, scanResultSecond); RedisKeyValueStateIterator<byte[], byte[]> kvIterator = - new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(), - pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes()); @@ -112,17 +105,17 @@ public class RedisKeyValueStateIteratorTest { putEncodedKeyValueToMap(chunkMap, "key2".getBytes(), "value2".getBytes()); ScanResult<Map.Entry<byte[], byte[]>> scanResultFirst = new ScanResult<>( - "12345".getBytes(), new ArrayList<Map.Entry<byte[], byte[]>>()); + "12345".getBytes(), new ArrayList<Map.Entry<byte[], byte[]>>()); ScanResult<Map.Entry<byte[], byte[]>> scanResultSecond = new ScanResult<>( - "23456".getBytes(), new ArrayList<Map.Entry<byte[], byte[]>>()); + "23456".getBytes(), new ArrayList<Map.Entry<byte[], byte[]>>()); ScanResult<Map.Entry<byte[], byte[]>> scanResultThird = new ScanResult<>( - ScanParams.SCAN_POINTER_START_BINARY, new ArrayList<>(chunkMap.entrySet())); + ScanParams.SCAN_POINTER_START_BINARY, new ArrayList<>(chunkMap.entrySet())); when(mockJedis.hscan(eq(namespace), any(byte[].class), any(ScanParams.class))) - .thenReturn(scanResultFirst, scanResultSecond, scanResultThird); + .thenReturn(scanResultFirst, scanResultSecond, scanResultThird); RedisKeyValueStateIterator<byte[], byte[]> kvIterator = - new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(), - pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes()); @@ -152,17 +145,17 @@ public class RedisKeyValueStateIteratorTest { putEncodedKeyValueToMap(chunkMap2, "key4".getBytes(), "value4".getBytes()); ScanResult<Map.Entry<byte[], byte[]>> scanResultFirst = new ScanResult<>( - "12345".getBytes(), new ArrayList<>(chunkMap.entrySet())); + "12345".getBytes(), new ArrayList<>(chunkMap.entrySet())); ScanResult<Map.Entry<byte[], byte[]>> scanResultSecond = new ScanResult<>( - "23456".getBytes(), new ArrayList<>(chunkMap2.entrySet())); + "23456".getBytes(), new ArrayList<>(chunkMap2.entrySet())); ScanResult<Map.Entry<byte[], byte[]>> scanResultThird = new ScanResult<>( - ScanParams.SCAN_POINTER_START_BINARY, new ArrayList<Map.Entry<byte[], byte[]>>()); + ScanParams.SCAN_POINTER_START_BINARY, new ArrayList<Map.Entry<byte[], byte[]>>()); when(mockJedis.hscan(eq(namespace), any(byte[].class), any(ScanParams.class))) - .thenReturn(scanResultFirst, scanResultSecond, scanResultThird); + .thenReturn(scanResultFirst, scanResultSecond, scanResultThird); RedisKeyValueStateIterator<byte[], byte[]> kvIterator = - new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(), - pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); // keys shouldn't appear twice @@ -184,13 +177,13 @@ public class RedisKeyValueStateIteratorTest { NavigableMap<byte[], byte[]> pendingCommit = getBinaryTreeMap(); ScanResult<Map.Entry<byte[], byte[]>> scanResult = new ScanResult<>( - ScanParams.SCAN_POINTER_START_BINARY, new ArrayList<Map.Entry<byte[], byte[]>>()); + ScanParams.SCAN_POINTER_START_BINARY, new ArrayList<Map.Entry<byte[], byte[]>>()); when(mockJedis.hscan(eq(namespace), any(byte[].class), any(ScanParams.class))) - .thenReturn(scanResult); + .thenReturn(scanResult); RedisKeyValueStateIterator<byte[], byte[]> kvIterator = - new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(), - pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); assertFalse(kvIterator.hasNext()); } http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java index 92e76ec..b8de3d3 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java @@ -1,32 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.redis.state; -import org.apache.storm.Config; -import org.apache.storm.state.State; -import org.junit.Assert; -import org.junit.Test; +package org.apache.storm.redis.state; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.storm.Config; +import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; /** * Unit tests for {@link RedisKeyValueStateProvider} @@ -49,8 +41,8 @@ public class RedisKeyValueStateProviderTest { RedisKeyValueStateProvider provider = new RedisKeyValueStateProvider(); Map<String, Object> topoConf = new HashMap<>(); topoConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + - " \"jedisPoolConfig\":" + - "{\"host\":\"localhost\", \"port\":1000}}"); + " \"jedisPoolConfig\":" + + "{\"host\":\"localhost\", \"port\":1000}}"); RedisKeyValueStateProvider.StateConfig config = provider.getStateConfig(topoConf); //System.out.println(config); http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java index 480001c..db98a39 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java @@ -1,23 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.redis.state; import com.google.common.primitives.UnsignedBytes; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.storm.redis.common.commands.RedisCommands; import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer; import org.apache.storm.state.DefaultStateSerializer; @@ -29,14 +30,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import redis.clients.util.SafeEncoder; -import java.util.HashMap; -import java.util.Arrays; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentSkipListMap; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; /** * Unit tests for {@link RedisKeyValueState} @@ -53,7 +48,7 @@ public class RedisKeyValueStateTest { @Before public void setUp() { final NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap = - new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator()); + new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator()); mockContainer = Mockito.mock(RedisCommandsInstanceContainer.class); mockCommands = Mockito.mock(RedisCommands.class); Mockito.when(mockContainer.getInstance()).thenReturn(mockCommands); @@ -62,78 +57,78 @@ public class RedisKeyValueStateTest { ArgumentCaptor<Map> mapArgumentCaptor = ArgumentCaptor.forClass(Map.class); Mockito.when(mockCommands.exists(Mockito.any(byte[].class))) - .thenAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - return exists(mockMap, (byte[]) args[0]); - } - }); + .thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return exists(mockMap, (byte[]) args[0]); + } + }); Mockito.when(mockCommands.del(Mockito.any(byte[].class))) - .thenAnswer(new Answer<Long>() { - @Override - public Long answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - return del(mockMap, (byte[]) args[0]); - } - }); + .thenAnswer(new Answer<Long>() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return del(mockMap, (byte[]) args[0]); + } + }); Mockito.when(mockCommands.hmset(Mockito.any(byte[].class), Mockito.anyMap())) - .thenAnswer(new Answer<String>() { - @Override - public String answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - return hmset(mockMap, (byte[]) args[0], (Map<byte[], byte[]>) args[1]); - } - }); + .thenAnswer(new Answer<String>() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return hmset(mockMap, (byte[]) args[0], (Map<byte[], byte[]>) args[1]); + } + }); Mockito.when(mockCommands.hget(Mockito.any(byte[].class), Mockito.any(byte[].class))) - .thenAnswer(new Answer<byte[]>() { - @Override - public byte[] answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - return hget(mockMap, (byte[]) args[0], (byte[]) args[1]); - } - }); + .thenAnswer(new Answer<byte[]>() { + @Override + public byte[] answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return hget(mockMap, (byte[]) args[0], (byte[]) args[1]); + } + }); Mockito.when(mockCommands.hdel(Mockito.any(byte[].class), Mockito.<byte[]>anyVararg())) - .thenAnswer(new Answer<Long>() { - @Override - public Long answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - int argsSize = args.length; - byte[][] fields = Arrays.asList(args).subList(1, argsSize).toArray(new byte[argsSize - 1][]); - return hdel(mockMap, (byte[]) args[0], fields); - } - }); + .thenAnswer(new Answer<Long>() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + int argsSize = args.length; + byte[][] fields = Arrays.asList(args).subList(1, argsSize).toArray(new byte[argsSize - 1][]); + return hdel(mockMap, (byte[]) args[0], fields); + } + }); Mockito.when(mockCommands.exists(Mockito.anyString())) - .thenAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - return exists(mockMap, (String) args[0]); - } - }); + .thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return exists(mockMap, (String) args[0]); + } + }); Mockito.when(mockCommands.hmset(Mockito.anyString(), Mockito.anyMap())) - .thenAnswer(new Answer<String>() { - @Override - public String answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - return hmset(mockMap, (String) args[0], (Map<String, String>) args[1]); - } - }); + .thenAnswer(new Answer<String>() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return hmset(mockMap, (String) args[0], (Map<String, String>) args[1]); + } + }); Mockito.when(mockCommands.hgetAll(Mockito.anyString())) - .thenAnswer(new Answer<Map<String, String>>() { - @Override - public Map<String, String> answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - return hgetAll(mockMap, (String) args[0]); - } - }); + .thenAnswer(new Answer<Map<String, String>>() { + @Override + public Map<String, String> answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return hgetAll(mockMap, (String) args[0]); + } + }); keyValueState = new RedisKeyValueState<String, String>("test", mockContainer, new DefaultStateSerializer<String>(), new DefaultStateSerializer<String>()); @@ -167,38 +162,38 @@ public class RedisKeyValueStateTest { keyValueState.put("b", "2"); keyValueState.prepareCommit(1); keyValueState.put("c", "3"); - assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + assertArrayEquals(new String[]{ "1", "2", "3" }, getValues()); keyValueState.rollback(); - assertArrayEquals(new String[]{null, null, null}, getValues()); + assertArrayEquals(new String[]{ null, null, null }, getValues()); keyValueState.put("a", "1"); keyValueState.put("b", "2"); keyValueState.prepareCommit(1); keyValueState.commit(1); keyValueState.put("c", "3"); - assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + assertArrayEquals(new String[]{ "1", "2", "3" }, getValues()); keyValueState.rollback(); - assertArrayEquals(new String[]{"1", "2", null}, getValues()); + assertArrayEquals(new String[]{ "1", "2", null }, getValues()); keyValueState.put("c", "3"); assertEquals("2", keyValueState.delete("b")); assertEquals("3", keyValueState.delete("c")); - assertArrayEquals(new String[]{"1", null, null}, getValues()); + assertArrayEquals(new String[]{ "1", null, null }, getValues()); keyValueState.prepareCommit(2); - assertArrayEquals(new String[]{"1", null, null}, getValues()); + assertArrayEquals(new String[]{ "1", null, null }, getValues()); keyValueState.commit(2); - assertArrayEquals(new String[]{"1", null, null}, getValues()); + assertArrayEquals(new String[]{ "1", null, null }, getValues()); keyValueState.put("b", "2"); keyValueState.prepareCommit(3); keyValueState.put("c", "3"); - assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + assertArrayEquals(new String[]{ "1", "2", "3" }, getValues()); keyValueState.rollback(); - assertArrayEquals(new String[]{"1", null, null}, getValues()); + assertArrayEquals(new String[]{ "1", null, null }, getValues()); } private String[] getValues() { return new String[]{ - keyValueState.get("a"), - keyValueState.get("b"), - keyValueState.get("c") + keyValueState.get("a"), + keyValueState.get("b"), + keyValueState.get("c") }; } @@ -221,10 +216,11 @@ public class RedisKeyValueStateTest { } private Long del(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] key) { - if (mockMap.remove(key) == null) + if (mockMap.remove(key) == null) { return 0L; - else + } else { return 1L; + } } private byte[] hget(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] namespace, byte[] key) { @@ -234,9 +230,9 @@ public class RedisKeyValueStateTest { return null; } - private Long hdel(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] namespace, byte[] ... keys) { + private Long hdel(NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap, byte[] namespace, byte[]... keys) { Long count = 0L; - for (byte[] key: keys) { + for (byte[] key : keys) { if (mockMap.get(namespace).remove(key) != null) count++; } return count;
